From 96bdb22db926b84cc6df59fa790d1e38799d0d96 Mon Sep 17 00:00:00 2001 From: Thomas Wiecki Date: Wed, 4 Jun 2014 16:28:53 +0200 Subject: [PATCH] BUG: RollingPanel was not behaving correctly in corner cases. There quite some bugs in certain corner cases. Dropping of obsolete axes was not working correctly, roll over could cause obsolete axes to not drop. The tests are much more stringent now as well. --- tests/test_data_util.py | 114 ----------------------- tests/test_rolling_panel.py | 174 ++++++++++++++++++++++++++++++++++++ zipline/utils/data.py | 61 +++++++++---- 3 files changed, 218 insertions(+), 131 deletions(-) delete mode 100644 tests/test_data_util.py create mode 100644 tests/test_rolling_panel.py diff --git a/tests/test_data_util.py b/tests/test_data_util.py deleted file mode 100644 index 7e62185f..00000000 --- a/tests/test_data_util.py +++ /dev/null @@ -1,114 +0,0 @@ -# -# Copyright 2013 Quantopian, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest - -from collections import deque - -import numpy as np - -import pandas as pd -import pandas.util.testing as tm - -from zipline.utils.data import RollingPanel - - -class TestRollingPanel(unittest.TestCase): - - def test_basics(self): - items = ['foo', 'bar', 'baz'] - minor = ['A', 'B', 'C', 'D'] - - window = 10 - - rp = RollingPanel(window, items, minor, cap_multiple=2) - - dates = pd.date_range('2000-01-01', periods=30, tz='utc') - - major_deque = deque() - - frames = {} - - for i in range(30): - frame = pd.DataFrame(np.random.randn(3, 4), index=items, - columns=minor) - date = dates[i] - - rp.add_frame(date, frame) - - frames[date] = frame - major_deque.append(date) - - if i >= window: - major_deque.popleft() - - result = rp.get_current() - expected = pd.Panel(frames, items=list(major_deque), - major_axis=items, minor_axis=minor) - tm.assert_panel_equal(result, expected.swapaxes(0, 1)) - - -def run_history_implementations(option='clever', n=500, copy=False): - items = range(15) - minor = range(20) - window = 100 - periods = n - - dates = pd.date_range('2000-01-01', periods=periods, tz='utc') - frames = {} - - if option == 'clever': - rp = RollingPanel(window, items, minor, cap_multiple=2) - major_deque = deque() - - for i in range(periods): - if len(minor) > 5: - minor = minor[:-1] - if len(items) > 5: - items = items[:-1] - - dummy = pd.DataFrame(np.random.randn(len(items), len(minor)), - index=items, columns=minor) - - frame = dummy * (1 + 0.001 * i) - date = dates[i] - - rp.add_frame(date, frame) - - frames[date] = frame - major_deque.append(date) - - if i >= window: - del frames[major_deque.popleft()] - - result = rp.get_current() - if copy: - result = result.copy() - else: - major_deque = deque() - dummy = pd.DataFrame(np.random.randn(len(items), len(minor)), - index=items, columns=minor) - - for i in range(periods): - frame = dummy * (1 + 0.001 * i) - date = dates[i] - frames[date] = frame - major_deque.append(date) - - if i >= window: - del frames[major_deque.popleft()] - - result = pd.Panel(frames, items=list(major_deque), - major_axis=items, minor_axis=minor) diff --git a/tests/test_rolling_panel.py b/tests/test_rolling_panel.py new file mode 100644 index 00000000..5f638c6a --- /dev/null +++ b/tests/test_rolling_panel.py @@ -0,0 +1,174 @@ +# +# Copyright 2014 Quantopian, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from collections import deque + +import numpy as np + +import pandas as pd +import pandas.util.testing as tm + +from zipline.utils.data import RollingPanel + + +class TestRollingPanel(unittest.TestCase): + + def test_basics(self, window=10): + items = ['bar', 'baz', 'foo'] + minor = ['A', 'B', 'C', 'D'] + + rp = RollingPanel(window, items, minor, cap_multiple=2) + + dates = pd.date_range('2000-01-01', periods=30, tz='utc') + + major_deque = deque(maxlen=window) + + frames = {} + + for i, date in enumerate(dates): + frame = pd.DataFrame(np.random.randn(3, 4), index=items, + columns=minor) + + rp.add_frame(date, frame) + + frames[date] = frame + major_deque.append(date) + + result = rp.get_current() + expected = pd.Panel(frames, items=list(major_deque), + major_axis=items, minor_axis=minor) + + tm.assert_panel_equal(result, expected.swapaxes(0, 1)) + + def test_adding_and_dropping_items(self, n_items=5, n_minor=10, window=10, + periods=30): + np.random.seed(123) + + items = deque(range(n_items)) + minor = deque(range(n_minor)) + + expected_items = deque(range(n_items)) + expected_minor = deque(range(n_minor)) + + first_non_existant = max(n_items, n_minor) + 1 + # We want to add new columns with random order + add_items = np.arange(first_non_existant, first_non_existant + periods) + np.random.shuffle(add_items) + + rp = RollingPanel(window, items, minor, cap_multiple=2) + + dates = pd.date_range('2000-01-01', periods=periods, tz='utc') + + frames = {} + + expected_frames = deque(maxlen=window) + expected_dates = deque() + + for i, (date, add_item) in enumerate(zip(dates, add_items)): + frame = pd.DataFrame(np.random.randn(n_items, n_minor), + index=items, columns=minor) + + if i >= window: + # Old labels and dates should start to get dropped at every + # call + del frames[expected_dates.popleft()] + expected_minor.popleft() + expected_items.popleft() + + expected_frames.append(frame) + expected_dates.append(date) + + rp.add_frame(date, frame) + + frames[date] = frame + + result = rp.get_current() + np.testing.assert_array_equal(sorted(result.minor_axis.values), + sorted(expected_minor)) + np.testing.assert_array_equal(sorted(result.items.values), + sorted(expected_items)) + tm.assert_frame_equal(frame.T, + result.ix[frame.index, -1, frame.columns]) + expected_result = pd.Panel(frames).swapaxes(0, 1) + tm.assert_panel_equal(expected_result, + result) + + # Insert new items + minor.popleft() + minor.append(add_item) + items.popleft() + items.append(add_item) + + expected_minor.append(add_item) + expected_items.append(add_item) + + +def run_history_implementations(option='clever', n=500, change_fields=False, + copy=False, n_items=15, n_minor=20, + change_freq=5, window=100): + items = range(n_items) + minor = range(n_minor) + periods = n + + dates = pd.date_range('2000-01-01', periods=periods, tz='utc') + frames = {} + + if option == 'clever': + rp = RollingPanel(window, items, minor, cap_multiple=2) + major_deque = deque() + + for i in range(periods): + # Add a new and drop an field every change_freq iterations + if change_fields and (i % change_freq) == 0: + minor = minor[1:] + minor.append(minor[-1] + 1) + items = items[1:] + items.append(items[-1] + 1) + + dummy = pd.DataFrame(np.random.randn(len(items), len(minor)), + index=items, columns=minor) + + frame = dummy * (1 + 0.001 * i) + date = dates[i] + + rp.add_frame(date, frame) + + frames[date] = frame + major_deque.append(date) + + if i >= window: + del frames[major_deque.popleft()] + + result = rp.get_current() + if copy: + result = result.copy() + else: + major_deque = deque() + dummy = pd.DataFrame(np.random.randn(len(items), len(minor)), + index=items, columns=minor) + + for i in range(periods): + frame = dummy * (1 + 0.001 * i) + date = dates[i] + frames[date] = frame + major_deque.append(date) + + if i >= window: + del frames[major_deque.popleft()] + + result = pd.Panel(frames, items=list(major_deque), + major_axis=items, minor_axis=minor) diff --git a/zipline/utils/data.py b/zipline/utils/data.py index 5dbd8c31..427713f8 100644 --- a/zipline/utils/data.py +++ b/zipline/utils/data.py @@ -20,7 +20,7 @@ from copy import deepcopy def _ensure_index(x): if not isinstance(x, pd.Index): - x = pd.Index(x) + x = pd.Index(sorted(x)) return x @@ -59,22 +59,42 @@ class RollingPanel(object): return panel def _update_buffer(self, frame): - # Drop outdated, nan-filled minors (sids) and items (fields) - non_nan_cols = set(self.buffer.dropna(axis=1).minor_axis) - new_cols = set(frame.columns) + # Get current frame as we only need to care about the data that is in + # the active window + # Note that we have to increase pos so that we get the current frame as + # self.pos is increased _after_ this call + old_buffer = self.get_current(self.pos + 1) + + nans = pd.isnull(old_buffer) + + # Find minor_axes that have only nans + # Note that minor is axis 2 + non_nan_cols = set(old_buffer.minor_axis[~np.all(nans, axis=(0, 1))]) + # Determine new columns to be added + new_cols = set(frame.columns).difference(non_nan_cols) + # Update internal minor axis self.minor_axis = _ensure_index(new_cols.union(non_nan_cols)) - non_nan_items = set(self.buffer.dropna(axis=1).items) - new_items = set(frame.index) + # Same for items (fields) + # Find items axes that have only nans + # Note that items is axis 0 + non_nan_items = set(old_buffer.items[~np.all(nans, axis=(1, 2))]) + new_items = set(frame.index).difference(non_nan_items) self.items = _ensure_index(new_items.union(non_nan_items)) + # :NOTE: + # There is a simpler and 10x faster way to do this: + # + # Reindex buffer to update axes (automatically adds nans) + # self.buffer = self.buffer.reindex(items=self.items, + # major_axis=np.arange(self.cap), + # minor_axis=self.minor_axis) + # + # However, pandas==0.12.0, for which we remain backwards compatible, + # has a bug in .reindex() that this triggers. Using .update() as before + # seems to work fine. + new_buffer = self._create_buffer() - # Copy old values we want to keep - # .update() is pretty slow. Ideally we would be using - # new_buffer.loc[non_nan_items, :, non_nan_cols] = - # but this triggers a bug in Pandas 0.11. Update - # this when 0.12 is released. - # https://github.com/pydata/pandas/issues/3777 new_buffer.update( self.buffer.loc[non_nan_items, :, non_nan_cols]) @@ -90,30 +110,37 @@ class RollingPanel(object): set(frame.index).difference(set(self.items)): self._update_buffer(frame) - self.buffer.loc[:, self.pos, :] = frame.ix[self.items].T + self.buffer.loc[:, self.pos, :] = \ + frame.ix[self.items].T.astype(self.dtype) self.index_buf[self.pos] = tick self.pos += 1 - def get_current(self): + def get_current(self, pos=None): """ Get a Panel that is the current data in view. It is not safe to persist these objects because internal data might change """ - where = slice(max(self.pos - self.window, 0), self.pos) + if pos is None: + pos = self.pos + + where = slice(max(pos - self.window, 0), pos) major_axis = pd.DatetimeIndex(deepcopy(self.index_buf[where]), tz='utc') return pd.Panel(self.buffer.values[:, where, :], self.items, - major_axis, self.minor_axis) + major_axis, self.minor_axis, dtype=self.dtype) def _roll_data(self): """ Roll window worth of data up to position zero. Save the effort of having to expensively roll at each iteration """ + self.buffer.values[:, :self.window, :] = \ - self.buffer.values[:, -self.window:] + self.buffer.values[:, -self.window:, :] + # Clean out nans so that they get dropped in _update_buffer() + self.buffer.values[:, -self.window:, :] = np.nan self.index_buf[:self.window] = self.index_buf[-self.window:] self.pos = self.window