diff --git a/tests/test_daily_history_aggregator.py b/tests/test_daily_history_aggregator.py new file mode 100644 index 00000000..2a234d31 --- /dev/null +++ b/tests/test_daily_history_aggregator.py @@ -0,0 +1,279 @@ +# +# Copyright 2016 Quantopian, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from numbers import Real + +from nose_parameterized import parameterized +from numpy.testing import assert_almost_equal +from numpy import nan +import pandas as pd + +from zipline.data.data_portal import DailyHistoryAggregator + +from zipline.testing.fixtures import ( + WithBcolzEquityMinuteBarReader, + ZiplineTestCase, +) + +OHLC = ['open', 'high', 'low', 'close'] +OHLCV = OHLC + ['volume'] + + +class MinuteToDailyAggregationTestCase(WithBcolzEquityMinuteBarReader, + ZiplineTestCase): + + # March 2016 + # Su Mo Tu We Th Fr Sa + # 1 2 3 4 5 + # 6 7 8 9 10 11 12 + # 13 14 15 16 17 18 19 + # 20 21 22 23 24 25 26 + # 27 28 29 30 31 + + TRADING_ENV_MIN_DATE = START_DATE = pd.Timestamp( + '2016-03-01', tz='UTC', + ) + TRADING_ENV_MAX_DATE = END_DATE = pd.Timestamp( + '2016-03-31', tz='UTC', + ) + ASSET_FINDER_EQUITY_SIDS = 1, 2 + + minutes = pd.date_range('2016-03-15 9:31', + '2016-03-15 9:36', + freq='min', + tz='US/Eastern').tz_convert('UTC') + + @classmethod + def make_equity_minute_bar_data(cls): + # sid data is created so that at least one high is lower than a + # previous high, and the inverse for low + yield 1, pd.DataFrame( + { + 'open': [nan, 103.50, 102.50, 104.50, 101.50, nan], + 'high': [nan, 103.90, 102.90, 104.90, 101.90, nan], + 'low': [nan, 103.10, 102.10, 104.10, 101.10, nan], + 'close': [nan, 103.30, 102.30, 104.30, 101.30, nan], + 'volume': [0, 1003, 1002, 1004, 1001, 0] + }, + index=cls.minutes, + ) + # sid 2 is included to provide data on different bars than sid 1, + # as will as illiquidty mid-day + yield 2, pd.DataFrame( + { + 'open': [201.50, nan, 204.50, nan, 200.50, 202.50], + 'high': [201.90, nan, 204.90, nan, 200.90, 202.90], + 'low': [201.10, nan, 204.10, nan, 200.10, 202.10], + 'close': [201.30, nan, 203.50, nan, 200.30, 202.30], + 'volume': [2001, 0, 2004, 0, 2000, 2002], + }, + index=cls.minutes, + ) + + expected_values = { + 1: pd.DataFrame( + { + 'open': [nan, 103.50, 103.50, 103.50, 103.50, 103.50], + 'high': [nan, 103.90, 103.90, 104.90, 104.90, 104.90], + 'low': [nan, 103.10, 102.10, 102.10, 101.10, 101.10], + 'close': [nan, 103.30, 102.30, 104.30, 101.30, 101.30], + 'volume': [0, 1003, 2005, 3009, 4010, 4010] + }, + index=minutes, + ), + 2: pd.DataFrame( + { + 'open': [201.50, 201.50, 201.50, 201.50, 201.50, 201.50], + 'high': [201.90, 201.90, 204.90, 204.90, 204.90, 204.90], + 'low': [201.10, 201.10, 201.10, 201.10, 200.10, 200.10], + 'close': [201.30, 201.30, 203.50, 203.50, 200.30, 202.30], + 'volume': [2001, 2001, 4005, 4005, 6005, 8007], + }, + index=minutes, + ) + } + + @classmethod + def init_class_fixtures(cls): + super(MinuteToDailyAggregationTestCase, cls).init_class_fixtures() + + cls.EQUITIES = { + 1: cls.env.asset_finder.retrieve_asset(1), + 2: cls.env.asset_finder.retrieve_asset(2) + } + + def init_instance_fixtures(self): + super(MinuteToDailyAggregationTestCase, self).init_instance_fixtures() + # Set up a fresh data portal for each test, since order of calling + # needs to be tested. + self.equity_daily_aggregator = DailyHistoryAggregator( + self.trading_calendar.schedule.market_open, + self.bcolz_equity_minute_bar_reader, + ) + + @parameterized.expand([ + ('open_sid_1', 'open', 1), + ('high_1', 'high', 1), + ('low_1', 'low', 1), + ('close_1', 'close', 1), + ('volume_1', 'volume', 1), + ('open_2', 'open', 2), + ('high_2', 'high', 2), + ('low_2', 'low', 2), + ('close_2', 'close', 2), + ('volume_2', 'volume', 2), + + ]) + def test_contiguous_minutes_individual(self, name, field, sid): + # First test each minute in order. + method_name = field + 's' + results = [] + repeat_results = [] + asset = self.EQUITIES[sid] + for minute in self.minutes: + value = getattr(self.equity_daily_aggregator, method_name)( + [asset], minute)[0] + # Prevent regression on building an array when scalar is intended. + self.assertIsInstance(value, Real) + results.append(value) + + # Call a second time with the same dt, to prevent regression + # against case where crossed start and end dts caused a crash + # instead of the last value. + value = getattr(self.equity_daily_aggregator, method_name)( + [asset], minute)[0] + # Prevent regression on building an array when scalar is intended. + self.assertIsInstance(value, Real) + repeat_results.append(value) + + assert_almost_equal(results, self.expected_values[asset][field], + err_msg='sid={0} field={1}'.format(asset, field)) + assert_almost_equal(repeat_results, self.expected_values[asset][field], + err_msg='sid={0} field={1}'.format(asset, field)) + + @parameterized.expand([ + ('open_sid_1', 'open', 1), + ('high_1', 'high', 1), + ('low_1', 'low', 1), + ('close_1', 'close', 1), + ('volume_1', 'volume', 1), + ('open_2', 'open', 2), + ('high_2', 'high', 2), + ('low_2', 'low', 2), + ('close_2', 'close', 2), + ('volume_2', 'volume', 2), + + ]) + def test_skip_minutes_individual(self, name, field, sid): + # Test skipping minutes, to exercise backfills. + # Tests initial backfill and mid day backfill. + method_name = field + 's' + for i in [1, 5]: + minute = self.minutes[i] + asset = self.EQUITIES[sid] + value = getattr(self.equity_daily_aggregator, method_name)( + [asset], minute)[0] + # Prevent regression on building an array when scalar is intended. + self.assertIsInstance(value, Real) + assert_almost_equal(value, + self.expected_values[sid][field][i], + err_msg='sid={0} field={1} dt={2}'.format( + sid, field, minute)) + + # Call a second time with the same dt, to prevent regression + # against case where crossed start and end dts caused a crash + # instead of the last value. + value = getattr(self.equity_daily_aggregator, method_name)( + [asset], minute)[0] + # Prevent regression on building an array when scalar is intended. + self.assertIsInstance(value, Real) + assert_almost_equal(value, + self.expected_values[sid][field][i], + err_msg='sid={0} field={1} dt={2}'.format( + sid, field, minute)) + + @parameterized.expand(OHLCV) + def test_contiguous_minutes_multiple(self, field): + # First test each minute in order. + method_name = field + 's' + assets = sorted(self.EQUITIES.values()) + results = {asset: [] for asset in assets} + repeat_results = {asset: [] for asset in assets} + for minute in self.minutes: + values = getattr(self.equity_daily_aggregator, method_name)( + assets, minute) + for j, asset in enumerate(assets): + value = values[j] + # Prevent regression on building an array when scalar is + # intended. + self.assertIsInstance(value, Real) + results[asset].append(value) + + # Call a second time with the same dt, to prevent regression + # against case where crossed start and end dts caused a crash + # instead of the last value. + values = getattr(self.equity_daily_aggregator, method_name)( + assets, minute) + for j, asset in enumerate(assets): + value = values[j] + # Prevent regression on building an array when scalar is + # intended. + self.assertIsInstance(value, Real) + repeat_results[asset].append(value) + for asset in assets: + assert_almost_equal(results[asset], + self.expected_values[asset][field], + err_msg='sid={0} field={1}'.format( + asset, field)) + assert_almost_equal(repeat_results[asset], + self.expected_values[asset][field], + err_msg='sid={0} field={1}'.format( + asset, field)) + + @parameterized.expand(OHLCV) + def test_skip_minutes_multiple(self, field): + # Test skipping minutes, to exercise backfills. + # Tests initial backfill and mid day backfill. + method_name = field + 's' + assets = sorted(self.EQUITIES.values()) + for i in [1, 5]: + minute = self.minutes[i] + values = getattr(self.equity_daily_aggregator, method_name)( + assets, minute) + for j, asset in enumerate(assets): + value = values[j] + # Prevent regression on building an array when scalar is + # intended. + self.assertIsInstance(value, Real) + assert_almost_equal( + value, + self.expected_values[asset][field][i], + err_msg='sid={0} field={1} dt={2}'.format( + asset, field, minute)) + + # Call a second time with the same dt, to prevent regression + # against case where crossed start and end dts caused a crash + # instead of the last value. + values = getattr(self.equity_daily_aggregator, method_name)( + assets, minute) + for j, asset in enumerate(assets): + value = values[j] + # Prevent regression on building an array when scalar is + # intended. + self.assertIsInstance(value, Real) + assert_almost_equal( + value, + self.expected_values[asset][field][i], + err_msg='sid={0} field={1} dt={2}'.format( + asset, field, minute)) diff --git a/tests/test_history.py b/tests/test_history.py index df020e25..1824aa24 100644 --- a/tests/test_history.py +++ b/tests/test_history.py @@ -1,18 +1,28 @@ +# +# Copyright 2016 Quantopian, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from textwrap import dedent -from numbers import Real - from nose_parameterized import parameterized import numpy as np from numpy import nan -from numpy.testing import assert_almost_equal import pandas as pd from six import iteritems from zipline import TradingAlgorithm from zipline._protocol import handle_non_market_minutes from zipline.assets import Asset -from zipline.data.data_portal import DailyHistoryAggregator from zipline.errors import ( HistoryInInitialize, HistoryWindowStartsBeforeData, @@ -25,7 +35,6 @@ from zipline.testing import ( MockDailyBarReader, ) from zipline.testing.fixtures import ( - WithBcolzEquityMinuteBarReader, WithDataPortal, ZiplineTestCase, alias, @@ -33,7 +42,6 @@ from zipline.testing.fixtures import ( OHLC = ['open', 'high', 'low', 'close'] -OHLCV = OHLC + ['volume'] OHLCP = OHLC + ['price'] ALL_FIELDS = OHLCP + ['volume'] @@ -1712,252 +1720,3 @@ class DailyEquityHistoryTestCase(WithHistory, ZiplineTestCase): window_2[self.ASSET1].values) np.testing.assert_almost_equal(window_1[self.ASSET2].values, window_2[self.ASSET2].values) - - -class MinuteToDailyAggregationTestCase(WithBcolzEquityMinuteBarReader, - ZiplineTestCase): - - # March 2016 - # Su Mo Tu We Th Fr Sa - # 1 2 3 4 5 - # 6 7 8 9 10 11 12 - # 13 14 15 16 17 18 19 - # 20 21 22 23 24 25 26 - # 27 28 29 30 31 - - TRADING_ENV_MIN_DATE = START_DATE = pd.Timestamp( - '2016-03-01', tz='UTC', - ) - TRADING_ENV_MAX_DATE = END_DATE = pd.Timestamp( - '2016-03-31', tz='UTC', - ) - ASSET_FINDER_EQUITY_SIDS = 1, 2 - - minutes = pd.date_range('2016-03-15 9:31', - '2016-03-15 9:36', - freq='min', - tz='US/Eastern').tz_convert('UTC') - - @classmethod - def make_equity_minute_bar_data(cls): - # sid data is created so that at least one high is lower than a - # previous high, and the inverse for low - yield 1, pd.DataFrame( - { - 'open': [nan, 103.50, 102.50, 104.50, 101.50, nan], - 'high': [nan, 103.90, 102.90, 104.90, 101.90, nan], - 'low': [nan, 103.10, 102.10, 104.10, 101.10, nan], - 'close': [nan, 103.30, 102.30, 104.30, 101.30, nan], - 'volume': [0, 1003, 1002, 1004, 1001, 0] - }, - index=cls.minutes, - ) - # sid 2 is included to provide data on different bars than sid 1, - # as will as illiquidty mid-day - yield 2, pd.DataFrame( - { - 'open': [201.50, nan, 204.50, nan, 200.50, 202.50], - 'high': [201.90, nan, 204.90, nan, 200.90, 202.90], - 'low': [201.10, nan, 204.10, nan, 200.10, 202.10], - 'close': [201.30, nan, 203.50, nan, 200.30, 202.30], - 'volume': [2001, 0, 2004, 0, 2000, 2002], - }, - index=cls.minutes, - ) - - expected_values = { - 1: pd.DataFrame( - { - 'open': [nan, 103.50, 103.50, 103.50, 103.50, 103.50], - 'high': [nan, 103.90, 103.90, 104.90, 104.90, 104.90], - 'low': [nan, 103.10, 102.10, 102.10, 101.10, 101.10], - 'close': [nan, 103.30, 102.30, 104.30, 101.30, 101.30], - 'volume': [0, 1003, 2005, 3009, 4010, 4010] - }, - index=minutes, - ), - 2: pd.DataFrame( - { - 'open': [201.50, 201.50, 201.50, 201.50, 201.50, 201.50], - 'high': [201.90, 201.90, 204.90, 204.90, 204.90, 204.90], - 'low': [201.10, 201.10, 201.10, 201.10, 200.10, 200.10], - 'close': [201.30, 201.30, 203.50, 203.50, 200.30, 202.30], - 'volume': [2001, 2001, 4005, 4005, 6005, 8007], - }, - index=minutes, - ) - } - - @classmethod - def init_class_fixtures(cls): - super(MinuteToDailyAggregationTestCase, cls).init_class_fixtures() - - cls.EQUITIES = { - 1: cls.env.asset_finder.retrieve_asset(1), - 2: cls.env.asset_finder.retrieve_asset(2) - } - - def init_instance_fixtures(self): - super(MinuteToDailyAggregationTestCase, self).init_instance_fixtures() - # Set up a fresh data portal for each test, since order of calling - # needs to be tested. - self.equity_daily_aggregator = DailyHistoryAggregator( - self.trading_calendar.schedule.market_open, - self.bcolz_equity_minute_bar_reader, - ) - - @parameterized.expand([ - ('open_sid_1', 'open', 1), - ('high_1', 'high', 1), - ('low_1', 'low', 1), - ('close_1', 'close', 1), - ('volume_1', 'volume', 1), - ('open_2', 'open', 2), - ('high_2', 'high', 2), - ('low_2', 'low', 2), - ('close_2', 'close', 2), - ('volume_2', 'volume', 2), - - ]) - def test_contiguous_minutes_individual(self, name, field, sid): - # First test each minute in order. - method_name = field + 's' - results = [] - repeat_results = [] - asset = self.EQUITIES[sid] - for minute in self.minutes: - value = getattr(self.equity_daily_aggregator, method_name)( - [asset], minute)[0] - # Prevent regression on building an array when scalar is intended. - self.assertIsInstance(value, Real) - results.append(value) - - # Call a second time with the same dt, to prevent regression - # against case where crossed start and end dts caused a crash - # instead of the last value. - value = getattr(self.equity_daily_aggregator, method_name)( - [asset], minute)[0] - # Prevent regression on building an array when scalar is intended. - self.assertIsInstance(value, Real) - repeat_results.append(value) - - assert_almost_equal(results, self.expected_values[asset][field], - err_msg='sid={0} field={1}'.format(asset, field)) - assert_almost_equal(repeat_results, self.expected_values[asset][field], - err_msg='sid={0} field={1}'.format(asset, field)) - - @parameterized.expand([ - ('open_sid_1', 'open', 1), - ('high_1', 'high', 1), - ('low_1', 'low', 1), - ('close_1', 'close', 1), - ('volume_1', 'volume', 1), - ('open_2', 'open', 2), - ('high_2', 'high', 2), - ('low_2', 'low', 2), - ('close_2', 'close', 2), - ('volume_2', 'volume', 2), - - ]) - def test_skip_minutes_individual(self, name, field, sid): - # Test skipping minutes, to exercise backfills. - # Tests initial backfill and mid day backfill. - method_name = field + 's' - for i in [1, 5]: - minute = self.minutes[i] - asset = self.EQUITIES[sid] - value = getattr(self.equity_daily_aggregator, method_name)( - [asset], minute)[0] - # Prevent regression on building an array when scalar is intended. - self.assertIsInstance(value, Real) - assert_almost_equal(value, - self.expected_values[sid][field][i], - err_msg='sid={0} field={1} dt={2}'.format( - sid, field, minute)) - - # Call a second time with the same dt, to prevent regression - # against case where crossed start and end dts caused a crash - # instead of the last value. - value = getattr(self.equity_daily_aggregator, method_name)( - [asset], minute)[0] - # Prevent regression on building an array when scalar is intended. - self.assertIsInstance(value, Real) - assert_almost_equal(value, - self.expected_values[sid][field][i], - err_msg='sid={0} field={1} dt={2}'.format( - sid, field, minute)) - - @parameterized.expand(OHLCV) - def test_contiguous_minutes_multiple(self, field): - # First test each minute in order. - method_name = field + 's' - assets = sorted(self.EQUITIES.values()) - results = {asset: [] for asset in assets} - repeat_results = {asset: [] for asset in assets} - for minute in self.minutes: - values = getattr(self.equity_daily_aggregator, method_name)( - assets, minute) - for j, asset in enumerate(assets): - value = values[j] - # Prevent regression on building an array when scalar is - # intended. - self.assertIsInstance(value, Real) - results[asset].append(value) - - # Call a second time with the same dt, to prevent regression - # against case where crossed start and end dts caused a crash - # instead of the last value. - values = getattr(self.equity_daily_aggregator, method_name)( - assets, minute) - for j, asset in enumerate(assets): - value = values[j] - # Prevent regression on building an array when scalar is - # intended. - self.assertIsInstance(value, Real) - repeat_results[asset].append(value) - for asset in assets: - assert_almost_equal(results[asset], - self.expected_values[asset][field], - err_msg='sid={0} field={1}'.format( - asset, field)) - assert_almost_equal(repeat_results[asset], - self.expected_values[asset][field], - err_msg='sid={0} field={1}'.format( - asset, field)) - - @parameterized.expand(OHLCV) - def test_skip_minutes_multiple(self, field): - # Test skipping minutes, to exercise backfills. - # Tests initial backfill and mid day backfill. - method_name = field + 's' - assets = sorted(self.EQUITIES.values()) - for i in [1, 5]: - minute = self.minutes[i] - values = getattr(self.equity_daily_aggregator, method_name)( - assets, minute) - for j, asset in enumerate(assets): - value = values[j] - # Prevent regression on building an array when scalar is - # intended. - self.assertIsInstance(value, Real) - assert_almost_equal( - value, - self.expected_values[asset][field][i], - err_msg='sid={0} field={1} dt={2}'.format( - asset, field, minute)) - - # Call a second time with the same dt, to prevent regression - # against case where crossed start and end dts caused a crash - # instead of the last value. - values = getattr(self.equity_daily_aggregator, method_name)( - assets, minute) - for j, asset in enumerate(assets): - value = values[j] - # Prevent regression on building an array when scalar is - # intended. - self.assertIsInstance(value, Real) - assert_almost_equal( - value, - self.expected_values[asset][field][i], - err_msg='sid={0} field={1} dt={2}'.format( - asset, field, minute)) diff --git a/zipline/data/daily_history_aggregator.py b/zipline/data/daily_history_aggregator.py new file mode 100644 index 00000000..cb108004 --- /dev/null +++ b/zipline/data/daily_history_aggregator.py @@ -0,0 +1,417 @@ +# +# Copyright 2016 Quantopian, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import numpy as np +import pandas as pd + +from pandas.tslib import normalize_date + + +class DailyHistoryAggregator(object): + """ + Converts minute pricing data into a daily summary, to be used for the + last slot in a call to history with a frequency of `1d`. + + This summary is the same as a daily bar rollup of minute data, with the + distinction that the summary is truncated to the `dt` requested. + i.e. the aggregation slides forward during a the course of simulation day. + + Provides aggregation for `open`, `high`, `low`, `close`, and `volume`. + The aggregation rules for each price type is documented in their respective + + """ + + def __init__(self, market_opens, minute_reader): + self._market_opens = market_opens + self._minute_reader = minute_reader + + # The caches are structured as (date, market_open, entries), where + # entries is a dict of asset -> (last_visited_dt, value) + # + # Whenever an aggregation method determines the current value, + # the entry for the respective asset should be overwritten with a new + # entry for the current dt.value (int) and aggregation value. + # + # When the requested dt's date is different from date the cache is + # flushed, so that the cache entries do not grow unbounded. + # + # Example cache: + # cache = (date(2016, 3, 17), + # pd.Timestamp('2016-03-17 13:31', tz='UTC'), + # { + # 1: (1458221460000000000, np.nan), + # 2: (1458221460000000000, 42.0), + # }) + self._caches = { + 'open': None, + 'high': None, + 'low': None, + 'close': None, + 'volume': None + } + + # The int value is used for deltas to avoid extra computation from + # creating new Timestamps. + self._one_min = pd.Timedelta('1 min').value + + def _prelude(self, dt, field): + date = dt.date() + dt_value = dt.value + cache = self._caches[field] + if cache is None or cache[0] != date: + market_open = self._market_opens.loc[date] + cache = self._caches[field] = (dt.date(), market_open, {}) + + _, market_open, entries = cache + market_open = market_open.tz_localize('UTC') + if dt != market_open: + prev_dt = dt_value - self._one_min + else: + prev_dt = None + return market_open, prev_dt, dt_value, entries + + def opens(self, assets, dt): + """ + The open field's aggregation returns the first value that occurs + for the day, if there has been no data on or before the `dt` the open + is `nan`. + + Once the first non-nan open is seen, that value remains constant per + asset for the remainder of the day. + + Returns + ------- + np.array with dtype=float64, in order of assets parameter. + """ + market_open, prev_dt, dt_value, entries = self._prelude(dt, 'open') + + opens = [] + normalized_date = normalize_date(dt) + + for asset in assets: + if not asset._is_alive(normalized_date, True): + opens.append(np.NaN) + continue + + if prev_dt is None: + val = self._minute_reader.get_value(asset, dt, 'open') + entries[asset] = (dt_value, val) + opens.append(val) + continue + else: + try: + last_visited_dt, first_open = entries[asset] + if last_visited_dt == dt_value: + opens.append(first_open) + continue + elif not pd.isnull(first_open): + opens.append(first_open) + entries[asset] = (dt_value, first_open) + continue + else: + after_last = pd.Timestamp( + last_visited_dt + self._one_min, tz='UTC') + window = self._minute_reader.load_raw_arrays( + ['open'], + after_last, + dt, + [asset], + )[0] + nonnan = window[~pd.isnull(window)] + if len(nonnan): + val = nonnan[0] + else: + val = np.nan + entries[asset] = (dt_value, val) + opens.append(val) + continue + except KeyError: + window = self._minute_reader.load_raw_arrays( + ['open'], + market_open, + dt, + [asset], + )[0] + nonnan = window[~pd.isnull(window)] + if len(nonnan): + val = nonnan[0] + else: + val = np.nan + entries[asset] = (dt_value, val) + opens.append(val) + continue + return np.array(opens) + + def highs(self, assets, dt): + """ + The high field's aggregation returns the largest high seen between + the market open and the current dt. + If there has been no data on or before the `dt` the high is `nan`. + + Returns + ------- + np.array with dtype=float64, in order of assets parameter. + """ + market_open, prev_dt, dt_value, entries = self._prelude(dt, 'high') + + highs = [] + normalized_date = normalize_date(dt) + + for asset in assets: + if not asset._is_alive(normalized_date, True): + highs.append(np.NaN) + continue + + if prev_dt is None: + val = self._minute_reader.get_value(asset, dt, 'high') + entries[asset] = (dt_value, val) + highs.append(val) + continue + else: + try: + last_visited_dt, last_max = entries[asset] + if last_visited_dt == dt_value: + highs.append(last_max) + continue + elif last_visited_dt == prev_dt: + curr_val = self._minute_reader.get_value( + asset, dt, 'high') + if pd.isnull(curr_val): + val = last_max + elif pd.isnull(last_max): + val = curr_val + else: + val = max(last_max, curr_val) + entries[asset] = (dt_value, val) + highs.append(val) + continue + else: + after_last = pd.Timestamp( + last_visited_dt + self._one_min, tz='UTC') + window = self._minute_reader.load_raw_arrays( + ['high'], + after_last, + dt, + [asset], + )[0].T + val = max(last_max, np.nanmax(window)) + entries[asset] = (dt_value, val) + highs.append(val) + continue + except KeyError: + window = self._minute_reader.load_raw_arrays( + ['high'], + market_open, + dt, + [asset], + )[0].T + val = np.nanmax(window) + entries[asset] = (dt_value, val) + highs.append(val) + continue + return np.array(highs) + + def lows(self, assets, dt): + """ + The low field's aggregation returns the smallest low seen between + the market open and the current dt. + If there has been no data on or before the `dt` the low is `nan`. + + Returns + ------- + np.array with dtype=float64, in order of assets parameter. + """ + market_open, prev_dt, dt_value, entries = self._prelude(dt, 'low') + + lows = [] + normalized_date = normalize_date(dt) + + for asset in assets: + if not asset._is_alive(normalized_date, True): + lows.append(np.NaN) + continue + + if prev_dt is None: + val = self._minute_reader.get_value(asset, dt, 'low') + entries[asset] = (dt_value, val) + lows.append(val) + continue + else: + try: + last_visited_dt, last_min = entries[asset] + if last_visited_dt == dt_value: + lows.append(last_min) + continue + elif last_visited_dt == prev_dt: + curr_val = self._minute_reader.get_value( + asset, dt, 'low') + val = np.nanmin([last_min, curr_val]) + entries[asset] = (dt_value, val) + lows.append(val) + continue + else: + after_last = pd.Timestamp( + last_visited_dt + self._one_min, tz='UTC') + window = self._minute_reader.load_raw_arrays( + ['low'], + after_last, + dt, + [asset], + )[0].T + window_min = np.nanmin(window) + if pd.isnull(window_min): + val = last_min + else: + val = min(last_min, window_min) + entries[asset] = (dt_value, val) + lows.append(val) + continue + except KeyError: + window = self._minute_reader.load_raw_arrays( + ['low'], + market_open, + dt, + [asset], + )[0].T + val = np.nanmin(window) + entries[asset] = (dt_value, val) + lows.append(val) + continue + return np.array(lows) + + def closes(self, assets, dt): + """ + The close field's aggregation returns the latest close at the given + dt. + If the close for the given dt is `nan`, the most recent non-nan + `close` is used. + If there has been no data on or before the `dt` the close is `nan`. + + Returns + ------- + np.array with dtype=float64, in order of assets parameter. + """ + market_open, prev_dt, dt_value, entries = self._prelude(dt, 'close') + + closes = [] + normalized_dt = normalize_date(dt) + + for asset in assets: + if not asset._is_alive(normalized_dt, True): + closes.append(np.NaN) + continue + + if prev_dt is None: + val = self._minute_reader.get_value(asset, dt, 'close') + entries[asset] = (dt_value, val) + closes.append(val) + continue + else: + try: + last_visited_dt, last_close = entries[asset] + if last_visited_dt == dt_value: + closes.append(last_close) + continue + elif last_visited_dt == prev_dt: + val = self._minute_reader.get_value( + asset, dt, 'close') + if pd.isnull(val): + val = last_close + entries[asset] = (dt_value, val) + closes.append(val) + continue + else: + val = self._minute_reader.get_value( + asset, dt, 'close') + if pd.isnull(val): + val = self.closes( + [asset], + pd.Timestamp(prev_dt, tz='UTC'))[0] + entries[asset] = (dt_value, val) + closes.append(val) + continue + except KeyError: + val = self._minute_reader.get_value( + asset, dt, 'close') + if pd.isnull(val): + val = self.closes([asset], + pd.Timestamp(prev_dt, tz='UTC'))[0] + entries[asset] = (dt_value, val) + closes.append(val) + continue + return np.array(closes) + + def volumes(self, assets, dt): + """ + The volume field's aggregation returns the sum of all volumes + between the market open and the `dt` + If there has been no data on or before the `dt` the volume is 0. + + Returns + ------- + np.array with dtype=int64, in order of assets parameter. + """ + market_open, prev_dt, dt_value, entries = self._prelude(dt, 'volume') + + volumes = [] + normalized_date = normalize_date(dt) + + for asset in assets: + if not asset._is_alive(normalized_date, True): + volumes.append(0) + continue + + if prev_dt is None: + val = self._minute_reader.get_value(asset, dt, 'volume') + entries[asset] = (dt_value, val) + volumes.append(val) + continue + else: + try: + last_visited_dt, last_total = entries[asset] + if last_visited_dt == dt_value: + volumes.append(last_total) + continue + elif last_visited_dt == prev_dt: + val = self._minute_reader.get_value( + asset, dt, 'volume') + val += last_total + entries[asset] = (dt_value, val) + volumes.append(val) + continue + else: + after_last = pd.Timestamp( + last_visited_dt + self._one_min, tz='UTC') + window = self._minute_reader.load_raw_arrays( + ['volume'], + after_last, + dt, + [asset], + )[0] + val = np.nansum(window) + last_total + entries[asset] = (dt_value, val) + volumes.append(val) + continue + except KeyError: + window = self._minute_reader.load_raw_arrays( + ['volume'], + market_open, + dt, + [asset], + )[0] + val = np.nansum(window) + entries[asset] = (dt_value, val) + volumes.append(val) + continue + return np.array(volumes) diff --git a/zipline/data/data_portal.py b/zipline/data/data_portal.py index 50850015..e1e7dccc 100644 --- a/zipline/data/data_portal.py +++ b/zipline/data/data_portal.py @@ -24,6 +24,7 @@ from six import iteritems from six.moves import reduce from zipline.assets import Asset, Future, Equity +from zipline.data.daily_history_aggregator import DailyHistoryAggregator from zipline.data.us_equity_pricing import NoDataOnDate from zipline.data.us_equity_loader import ( USEquityDailyHistoryLoader, @@ -59,405 +60,6 @@ OHLCVP_FIELDS = frozenset([ HISTORY_FREQUENCIES = set(["1m", "1d"]) -class DailyHistoryAggregator(object): - """ - Converts minute pricing data into a daily summary, to be used for the - last slot in a call to history with a frequency of `1d`. - - This summary is the same as a daily bar rollup of minute data, with the - distinction that the summary is truncated to the `dt` requested. - i.e. the aggregation slides forward during a the course of simulation day. - - Provides aggregation for `open`, `high`, `low`, `close`, and `volume`. - The aggregation rules for each price type is documented in their respective - - """ - - def __init__(self, market_opens, minute_reader): - self._market_opens = market_opens - self._minute_reader = minute_reader - - # The caches are structured as (date, market_open, entries), where - # entries is a dict of asset -> (last_visited_dt, value) - # - # Whenever an aggregation method determines the current value, - # the entry for the respective asset should be overwritten with a new - # entry for the current dt.value (int) and aggregation value. - # - # When the requested dt's date is different from date the cache is - # flushed, so that the cache entries do not grow unbounded. - # - # Example cache: - # cache = (date(2016, 3, 17), - # pd.Timestamp('2016-03-17 13:31', tz='UTC'), - # { - # 1: (1458221460000000000, np.nan), - # 2: (1458221460000000000, 42.0), - # }) - self._caches = { - 'open': None, - 'high': None, - 'low': None, - 'close': None, - 'volume': None - } - - # The int value is used for deltas to avoid extra computation from - # creating new Timestamps. - self._one_min = pd.Timedelta('1 min').value - - def _prelude(self, dt, field): - date = dt.date() - dt_value = dt.value - cache = self._caches[field] - if cache is None or cache[0] != date: - market_open = self._market_opens.loc[date] - cache = self._caches[field] = (dt.date(), market_open, {}) - - _, market_open, entries = cache - market_open = market_open.tz_localize('UTC') - if dt != market_open: - prev_dt = dt_value - self._one_min - else: - prev_dt = None - return market_open, prev_dt, dt_value, entries - - def opens(self, assets, dt): - """ - The open field's aggregation returns the first value that occurs - for the day, if there has been no data on or before the `dt` the open - is `nan`. - - Once the first non-nan open is seen, that value remains constant per - asset for the remainder of the day. - - Returns - ------- - np.array with dtype=float64, in order of assets parameter. - """ - market_open, prev_dt, dt_value, entries = self._prelude(dt, 'open') - - opens = [] - normalized_date = normalize_date(dt) - - for asset in assets: - if not asset._is_alive(normalized_date, True): - opens.append(np.NaN) - continue - - if prev_dt is None: - val = self._minute_reader.get_value(asset, dt, 'open') - entries[asset] = (dt_value, val) - opens.append(val) - continue - else: - try: - last_visited_dt, first_open = entries[asset] - if last_visited_dt == dt_value: - opens.append(first_open) - continue - elif not pd.isnull(first_open): - opens.append(first_open) - entries[asset] = (dt_value, first_open) - continue - else: - after_last = pd.Timestamp( - last_visited_dt + self._one_min, tz='UTC') - window = self._minute_reader.load_raw_arrays( - ['open'], - after_last, - dt, - [asset], - )[0] - nonnan = window[~pd.isnull(window)] - if len(nonnan): - val = nonnan[0] - else: - val = np.nan - entries[asset] = (dt_value, val) - opens.append(val) - continue - except KeyError: - window = self._minute_reader.load_raw_arrays( - ['open'], - market_open, - dt, - [asset], - )[0] - nonnan = window[~pd.isnull(window)] - if len(nonnan): - val = nonnan[0] - else: - val = np.nan - entries[asset] = (dt_value, val) - opens.append(val) - continue - return np.array(opens) - - def highs(self, assets, dt): - """ - The high field's aggregation returns the largest high seen between - the market open and the current dt. - If there has been no data on or before the `dt` the high is `nan`. - - Returns - ------- - np.array with dtype=float64, in order of assets parameter. - """ - market_open, prev_dt, dt_value, entries = self._prelude(dt, 'high') - - highs = [] - normalized_date = normalize_date(dt) - - for asset in assets: - if not asset._is_alive(normalized_date, True): - highs.append(np.NaN) - continue - - if prev_dt is None: - val = self._minute_reader.get_value(asset, dt, 'high') - entries[asset] = (dt_value, val) - highs.append(val) - continue - else: - try: - last_visited_dt, last_max = entries[asset] - if last_visited_dt == dt_value: - highs.append(last_max) - continue - elif last_visited_dt == prev_dt: - curr_val = self._minute_reader.get_value( - asset, dt, 'high') - if pd.isnull(curr_val): - val = last_max - elif pd.isnull(last_max): - val = curr_val - else: - val = max(last_max, curr_val) - entries[asset] = (dt_value, val) - highs.append(val) - continue - else: - after_last = pd.Timestamp( - last_visited_dt + self._one_min, tz='UTC') - window = self._minute_reader.load_raw_arrays( - ['high'], - after_last, - dt, - [asset], - )[0].T - val = max(last_max, np.nanmax(window)) - entries[asset] = (dt_value, val) - highs.append(val) - continue - except KeyError: - window = self._minute_reader.load_raw_arrays( - ['high'], - market_open, - dt, - [asset], - )[0].T - val = np.nanmax(window) - entries[asset] = (dt_value, val) - highs.append(val) - continue - return np.array(highs) - - def lows(self, assets, dt): - """ - The low field's aggregation returns the smallest low seen between - the market open and the current dt. - If there has been no data on or before the `dt` the low is `nan`. - - Returns - ------- - np.array with dtype=float64, in order of assets parameter. - """ - market_open, prev_dt, dt_value, entries = self._prelude(dt, 'low') - - lows = [] - normalized_date = normalize_date(dt) - - for asset in assets: - if not asset._is_alive(normalized_date, True): - lows.append(np.NaN) - continue - - if prev_dt is None: - val = self._minute_reader.get_value(asset, dt, 'low') - entries[asset] = (dt_value, val) - lows.append(val) - continue - else: - try: - last_visited_dt, last_min = entries[asset] - if last_visited_dt == dt_value: - lows.append(last_min) - continue - elif last_visited_dt == prev_dt: - curr_val = self._minute_reader.get_value( - asset, dt, 'low') - val = np.nanmin([last_min, curr_val]) - entries[asset] = (dt_value, val) - lows.append(val) - continue - else: - after_last = pd.Timestamp( - last_visited_dt + self._one_min, tz='UTC') - window = self._minute_reader.load_raw_arrays( - ['low'], - after_last, - dt, - [asset], - )[0].T - window_min = np.nanmin(window) - if pd.isnull(window_min): - val = last_min - else: - val = min(last_min, window_min) - entries[asset] = (dt_value, val) - lows.append(val) - continue - except KeyError: - window = self._minute_reader.load_raw_arrays( - ['low'], - market_open, - dt, - [asset], - )[0].T - val = np.nanmin(window) - entries[asset] = (dt_value, val) - lows.append(val) - continue - return np.array(lows) - - def closes(self, assets, dt): - """ - The close field's aggregation returns the latest close at the given - dt. - If the close for the given dt is `nan`, the most recent non-nan - `close` is used. - If there has been no data on or before the `dt` the close is `nan`. - - Returns - ------- - np.array with dtype=float64, in order of assets parameter. - """ - market_open, prev_dt, dt_value, entries = self._prelude(dt, 'close') - - closes = [] - normalized_dt = normalize_date(dt) - - for asset in assets: - if not asset._is_alive(normalized_dt, True): - closes.append(np.NaN) - continue - - if prev_dt is None: - val = self._minute_reader.get_value(asset, dt, 'close') - entries[asset] = (dt_value, val) - closes.append(val) - continue - else: - try: - last_visited_dt, last_close = entries[asset] - if last_visited_dt == dt_value: - closes.append(last_close) - continue - elif last_visited_dt == prev_dt: - val = self._minute_reader.get_value( - asset, dt, 'close') - if pd.isnull(val): - val = last_close - entries[asset] = (dt_value, val) - closes.append(val) - continue - else: - val = self._minute_reader.get_value( - asset, dt, 'close') - if pd.isnull(val): - val = self.closes( - [asset], - pd.Timestamp(prev_dt, tz='UTC'))[0] - entries[asset] = (dt_value, val) - closes.append(val) - continue - except KeyError: - val = self._minute_reader.get_value( - asset, dt, 'close') - if pd.isnull(val): - val = self.closes([asset], - pd.Timestamp(prev_dt, tz='UTC'))[0] - entries[asset] = (dt_value, val) - closes.append(val) - continue - return np.array(closes) - - def volumes(self, assets, dt): - """ - The volume field's aggregation returns the sum of all volumes - between the market open and the `dt` - If there has been no data on or before the `dt` the volume is 0. - - Returns - ------- - np.array with dtype=int64, in order of assets parameter. - """ - market_open, prev_dt, dt_value, entries = self._prelude(dt, 'volume') - - volumes = [] - normalized_date = normalize_date(dt) - - for asset in assets: - if not asset._is_alive(normalized_date, True): - volumes.append(0) - continue - - if prev_dt is None: - val = self._minute_reader.get_value(asset, dt, 'volume') - entries[asset] = (dt_value, val) - volumes.append(val) - continue - else: - try: - last_visited_dt, last_total = entries[asset] - if last_visited_dt == dt_value: - volumes.append(last_total) - continue - elif last_visited_dt == prev_dt: - val = self._minute_reader.get_value( - asset, dt, 'volume') - val += last_total - entries[asset] = (dt_value, val) - volumes.append(val) - continue - else: - after_last = pd.Timestamp( - last_visited_dt + self._one_min, tz='UTC') - window = self._minute_reader.load_raw_arrays( - ['volume'], - after_last, - dt, - [asset], - )[0] - val = np.nansum(window) + last_total - entries[asset] = (dt_value, val) - volumes.append(val) - continue - except KeyError: - window = self._minute_reader.load_raw_arrays( - ['volume'], - market_open, - dt, - [asset], - )[0] - val = np.nansum(window) - entries[asset] = (dt_value, val) - volumes.append(val) - continue - return np.array(volumes) - - class DataPortal(object): """Interface to all of the data that a zipline simulation needs.