diff --git a/tests/test_data_util.py b/tests/test_data_util.py deleted file mode 100644 index 7e62185f..00000000 --- a/tests/test_data_util.py +++ /dev/null @@ -1,114 +0,0 @@ -# -# Copyright 2013 Quantopian, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest - -from collections import deque - -import numpy as np - -import pandas as pd -import pandas.util.testing as tm - -from zipline.utils.data import RollingPanel - - -class TestRollingPanel(unittest.TestCase): - - def test_basics(self): - items = ['foo', 'bar', 'baz'] - minor = ['A', 'B', 'C', 'D'] - - window = 10 - - rp = RollingPanel(window, items, minor, cap_multiple=2) - - dates = pd.date_range('2000-01-01', periods=30, tz='utc') - - major_deque = deque() - - frames = {} - - for i in range(30): - frame = pd.DataFrame(np.random.randn(3, 4), index=items, - columns=minor) - date = dates[i] - - rp.add_frame(date, frame) - - frames[date] = frame - major_deque.append(date) - - if i >= window: - major_deque.popleft() - - result = rp.get_current() - expected = pd.Panel(frames, items=list(major_deque), - major_axis=items, minor_axis=minor) - tm.assert_panel_equal(result, expected.swapaxes(0, 1)) - - -def run_history_implementations(option='clever', n=500, copy=False): - items = range(15) - minor = range(20) - window = 100 - periods = n - - dates = pd.date_range('2000-01-01', periods=periods, tz='utc') - frames = {} - - if option == 'clever': - rp = RollingPanel(window, items, minor, cap_multiple=2) - major_deque = deque() - - for i in range(periods): - if len(minor) > 5: - minor = minor[:-1] - if len(items) > 5: - items = items[:-1] - - dummy = pd.DataFrame(np.random.randn(len(items), len(minor)), - index=items, columns=minor) - - frame = dummy * (1 + 0.001 * i) - date = dates[i] - - rp.add_frame(date, frame) - - frames[date] = frame - major_deque.append(date) - - if i >= window: - del frames[major_deque.popleft()] - - result = rp.get_current() - if copy: - result = result.copy() - else: - major_deque = deque() - dummy = pd.DataFrame(np.random.randn(len(items), len(minor)), - index=items, columns=minor) - - for i in range(periods): - frame = dummy * (1 + 0.001 * i) - date = dates[i] - frames[date] = frame - major_deque.append(date) - - if i >= window: - del frames[major_deque.popleft()] - - result = pd.Panel(frames, items=list(major_deque), - major_axis=items, minor_axis=minor) diff --git a/tests/test_rolling_panel.py b/tests/test_rolling_panel.py new file mode 100644 index 00000000..5f638c6a --- /dev/null +++ b/tests/test_rolling_panel.py @@ -0,0 +1,174 @@ +# +# Copyright 2014 Quantopian, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from collections import deque + +import numpy as np + +import pandas as pd +import pandas.util.testing as tm + +from zipline.utils.data import RollingPanel + + +class TestRollingPanel(unittest.TestCase): + + def test_basics(self, window=10): + items = ['bar', 'baz', 'foo'] + minor = ['A', 'B', 'C', 'D'] + + rp = RollingPanel(window, items, minor, cap_multiple=2) + + dates = pd.date_range('2000-01-01', periods=30, tz='utc') + + major_deque = deque(maxlen=window) + + frames = {} + + for i, date in enumerate(dates): + frame = pd.DataFrame(np.random.randn(3, 4), index=items, + columns=minor) + + rp.add_frame(date, frame) + + frames[date] = frame + major_deque.append(date) + + result = rp.get_current() + expected = pd.Panel(frames, items=list(major_deque), + major_axis=items, minor_axis=minor) + + tm.assert_panel_equal(result, expected.swapaxes(0, 1)) + + def test_adding_and_dropping_items(self, n_items=5, n_minor=10, window=10, + periods=30): + np.random.seed(123) + + items = deque(range(n_items)) + minor = deque(range(n_minor)) + + expected_items = deque(range(n_items)) + expected_minor = deque(range(n_minor)) + + first_non_existant = max(n_items, n_minor) + 1 + # We want to add new columns with random order + add_items = np.arange(first_non_existant, first_non_existant + periods) + np.random.shuffle(add_items) + + rp = RollingPanel(window, items, minor, cap_multiple=2) + + dates = pd.date_range('2000-01-01', periods=periods, tz='utc') + + frames = {} + + expected_frames = deque(maxlen=window) + expected_dates = deque() + + for i, (date, add_item) in enumerate(zip(dates, add_items)): + frame = pd.DataFrame(np.random.randn(n_items, n_minor), + index=items, columns=minor) + + if i >= window: + # Old labels and dates should start to get dropped at every + # call + del frames[expected_dates.popleft()] + expected_minor.popleft() + expected_items.popleft() + + expected_frames.append(frame) + expected_dates.append(date) + + rp.add_frame(date, frame) + + frames[date] = frame + + result = rp.get_current() + np.testing.assert_array_equal(sorted(result.minor_axis.values), + sorted(expected_minor)) + np.testing.assert_array_equal(sorted(result.items.values), + sorted(expected_items)) + tm.assert_frame_equal(frame.T, + result.ix[frame.index, -1, frame.columns]) + expected_result = pd.Panel(frames).swapaxes(0, 1) + tm.assert_panel_equal(expected_result, + result) + + # Insert new items + minor.popleft() + minor.append(add_item) + items.popleft() + items.append(add_item) + + expected_minor.append(add_item) + expected_items.append(add_item) + + +def run_history_implementations(option='clever', n=500, change_fields=False, + copy=False, n_items=15, n_minor=20, + change_freq=5, window=100): + items = range(n_items) + minor = range(n_minor) + periods = n + + dates = pd.date_range('2000-01-01', periods=periods, tz='utc') + frames = {} + + if option == 'clever': + rp = RollingPanel(window, items, minor, cap_multiple=2) + major_deque = deque() + + for i in range(periods): + # Add a new and drop an field every change_freq iterations + if change_fields and (i % change_freq) == 0: + minor = minor[1:] + minor.append(minor[-1] + 1) + items = items[1:] + items.append(items[-1] + 1) + + dummy = pd.DataFrame(np.random.randn(len(items), len(minor)), + index=items, columns=minor) + + frame = dummy * (1 + 0.001 * i) + date = dates[i] + + rp.add_frame(date, frame) + + frames[date] = frame + major_deque.append(date) + + if i >= window: + del frames[major_deque.popleft()] + + result = rp.get_current() + if copy: + result = result.copy() + else: + major_deque = deque() + dummy = pd.DataFrame(np.random.randn(len(items), len(minor)), + index=items, columns=minor) + + for i in range(periods): + frame = dummy * (1 + 0.001 * i) + date = dates[i] + frames[date] = frame + major_deque.append(date) + + if i >= window: + del frames[major_deque.popleft()] + + result = pd.Panel(frames, items=list(major_deque), + major_axis=items, minor_axis=minor) diff --git a/zipline/utils/data.py b/zipline/utils/data.py index 5dbd8c31..427713f8 100644 --- a/zipline/utils/data.py +++ b/zipline/utils/data.py @@ -20,7 +20,7 @@ from copy import deepcopy def _ensure_index(x): if not isinstance(x, pd.Index): - x = pd.Index(x) + x = pd.Index(sorted(x)) return x @@ -59,22 +59,42 @@ class RollingPanel(object): return panel def _update_buffer(self, frame): - # Drop outdated, nan-filled minors (sids) and items (fields) - non_nan_cols = set(self.buffer.dropna(axis=1).minor_axis) - new_cols = set(frame.columns) + # Get current frame as we only need to care about the data that is in + # the active window + # Note that we have to increase pos so that we get the current frame as + # self.pos is increased _after_ this call + old_buffer = self.get_current(self.pos + 1) + + nans = pd.isnull(old_buffer) + + # Find minor_axes that have only nans + # Note that minor is axis 2 + non_nan_cols = set(old_buffer.minor_axis[~np.all(nans, axis=(0, 1))]) + # Determine new columns to be added + new_cols = set(frame.columns).difference(non_nan_cols) + # Update internal minor axis self.minor_axis = _ensure_index(new_cols.union(non_nan_cols)) - non_nan_items = set(self.buffer.dropna(axis=1).items) - new_items = set(frame.index) + # Same for items (fields) + # Find items axes that have only nans + # Note that items is axis 0 + non_nan_items = set(old_buffer.items[~np.all(nans, axis=(1, 2))]) + new_items = set(frame.index).difference(non_nan_items) self.items = _ensure_index(new_items.union(non_nan_items)) + # :NOTE: + # There is a simpler and 10x faster way to do this: + # + # Reindex buffer to update axes (automatically adds nans) + # self.buffer = self.buffer.reindex(items=self.items, + # major_axis=np.arange(self.cap), + # minor_axis=self.minor_axis) + # + # However, pandas==0.12.0, for which we remain backwards compatible, + # has a bug in .reindex() that this triggers. Using .update() as before + # seems to work fine. + new_buffer = self._create_buffer() - # Copy old values we want to keep - # .update() is pretty slow. Ideally we would be using - # new_buffer.loc[non_nan_items, :, non_nan_cols] = - # but this triggers a bug in Pandas 0.11. Update - # this when 0.12 is released. - # https://github.com/pydata/pandas/issues/3777 new_buffer.update( self.buffer.loc[non_nan_items, :, non_nan_cols]) @@ -90,30 +110,37 @@ class RollingPanel(object): set(frame.index).difference(set(self.items)): self._update_buffer(frame) - self.buffer.loc[:, self.pos, :] = frame.ix[self.items].T + self.buffer.loc[:, self.pos, :] = \ + frame.ix[self.items].T.astype(self.dtype) self.index_buf[self.pos] = tick self.pos += 1 - def get_current(self): + def get_current(self, pos=None): """ Get a Panel that is the current data in view. It is not safe to persist these objects because internal data might change """ - where = slice(max(self.pos - self.window, 0), self.pos) + if pos is None: + pos = self.pos + + where = slice(max(pos - self.window, 0), pos) major_axis = pd.DatetimeIndex(deepcopy(self.index_buf[where]), tz='utc') return pd.Panel(self.buffer.values[:, where, :], self.items, - major_axis, self.minor_axis) + major_axis, self.minor_axis, dtype=self.dtype) def _roll_data(self): """ Roll window worth of data up to position zero. Save the effort of having to expensively roll at each iteration """ + self.buffer.values[:, :self.window, :] = \ - self.buffer.values[:, -self.window:] + self.buffer.values[:, -self.window:, :] + # Clean out nans so that they get dropped in _update_buffer() + self.buffer.values[:, -self.window:, :] = np.nan self.index_buf[:self.window] = self.index_buf[-self.window:] self.pos = self.window