MAINT: Move daily aggregator to own module.

Break out the daily history aggregator into its own module, instead of
being collocated with DataPortal.
This commit is contained in:
Eddie Hebert
2016-07-27 16:59:26 -04:00
parent e0f6abda2e
commit e00a25568d
4 changed files with 711 additions and 654 deletions
+279
View File
@@ -0,0 +1,279 @@
#
# Copyright 2016 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from numbers import Real
from nose_parameterized import parameterized
from numpy.testing import assert_almost_equal
from numpy import nan
import pandas as pd
from zipline.data.data_portal import DailyHistoryAggregator
from zipline.testing.fixtures import (
WithBcolzEquityMinuteBarReader,
ZiplineTestCase,
)
OHLC = ['open', 'high', 'low', 'close']
OHLCV = OHLC + ['volume']
class MinuteToDailyAggregationTestCase(WithBcolzEquityMinuteBarReader,
ZiplineTestCase):
# March 2016
# Su Mo Tu We Th Fr Sa
# 1 2 3 4 5
# 6 7 8 9 10 11 12
# 13 14 15 16 17 18 19
# 20 21 22 23 24 25 26
# 27 28 29 30 31
TRADING_ENV_MIN_DATE = START_DATE = pd.Timestamp(
'2016-03-01', tz='UTC',
)
TRADING_ENV_MAX_DATE = END_DATE = pd.Timestamp(
'2016-03-31', tz='UTC',
)
ASSET_FINDER_EQUITY_SIDS = 1, 2
minutes = pd.date_range('2016-03-15 9:31',
'2016-03-15 9:36',
freq='min',
tz='US/Eastern').tz_convert('UTC')
@classmethod
def make_equity_minute_bar_data(cls):
# sid data is created so that at least one high is lower than a
# previous high, and the inverse for low
yield 1, pd.DataFrame(
{
'open': [nan, 103.50, 102.50, 104.50, 101.50, nan],
'high': [nan, 103.90, 102.90, 104.90, 101.90, nan],
'low': [nan, 103.10, 102.10, 104.10, 101.10, nan],
'close': [nan, 103.30, 102.30, 104.30, 101.30, nan],
'volume': [0, 1003, 1002, 1004, 1001, 0]
},
index=cls.minutes,
)
# sid 2 is included to provide data on different bars than sid 1,
# as will as illiquidty mid-day
yield 2, pd.DataFrame(
{
'open': [201.50, nan, 204.50, nan, 200.50, 202.50],
'high': [201.90, nan, 204.90, nan, 200.90, 202.90],
'low': [201.10, nan, 204.10, nan, 200.10, 202.10],
'close': [201.30, nan, 203.50, nan, 200.30, 202.30],
'volume': [2001, 0, 2004, 0, 2000, 2002],
},
index=cls.minutes,
)
expected_values = {
1: pd.DataFrame(
{
'open': [nan, 103.50, 103.50, 103.50, 103.50, 103.50],
'high': [nan, 103.90, 103.90, 104.90, 104.90, 104.90],
'low': [nan, 103.10, 102.10, 102.10, 101.10, 101.10],
'close': [nan, 103.30, 102.30, 104.30, 101.30, 101.30],
'volume': [0, 1003, 2005, 3009, 4010, 4010]
},
index=minutes,
),
2: pd.DataFrame(
{
'open': [201.50, 201.50, 201.50, 201.50, 201.50, 201.50],
'high': [201.90, 201.90, 204.90, 204.90, 204.90, 204.90],
'low': [201.10, 201.10, 201.10, 201.10, 200.10, 200.10],
'close': [201.30, 201.30, 203.50, 203.50, 200.30, 202.30],
'volume': [2001, 2001, 4005, 4005, 6005, 8007],
},
index=minutes,
)
}
@classmethod
def init_class_fixtures(cls):
super(MinuteToDailyAggregationTestCase, cls).init_class_fixtures()
cls.EQUITIES = {
1: cls.env.asset_finder.retrieve_asset(1),
2: cls.env.asset_finder.retrieve_asset(2)
}
def init_instance_fixtures(self):
super(MinuteToDailyAggregationTestCase, self).init_instance_fixtures()
# Set up a fresh data portal for each test, since order of calling
# needs to be tested.
self.equity_daily_aggregator = DailyHistoryAggregator(
self.trading_calendar.schedule.market_open,
self.bcolz_equity_minute_bar_reader,
)
@parameterized.expand([
('open_sid_1', 'open', 1),
('high_1', 'high', 1),
('low_1', 'low', 1),
('close_1', 'close', 1),
('volume_1', 'volume', 1),
('open_2', 'open', 2),
('high_2', 'high', 2),
('low_2', 'low', 2),
('close_2', 'close', 2),
('volume_2', 'volume', 2),
])
def test_contiguous_minutes_individual(self, name, field, sid):
# First test each minute in order.
method_name = field + 's'
results = []
repeat_results = []
asset = self.EQUITIES[sid]
for minute in self.minutes:
value = getattr(self.equity_daily_aggregator, method_name)(
[asset], minute)[0]
# Prevent regression on building an array when scalar is intended.
self.assertIsInstance(value, Real)
results.append(value)
# Call a second time with the same dt, to prevent regression
# against case where crossed start and end dts caused a crash
# instead of the last value.
value = getattr(self.equity_daily_aggregator, method_name)(
[asset], minute)[0]
# Prevent regression on building an array when scalar is intended.
self.assertIsInstance(value, Real)
repeat_results.append(value)
assert_almost_equal(results, self.expected_values[asset][field],
err_msg='sid={0} field={1}'.format(asset, field))
assert_almost_equal(repeat_results, self.expected_values[asset][field],
err_msg='sid={0} field={1}'.format(asset, field))
@parameterized.expand([
('open_sid_1', 'open', 1),
('high_1', 'high', 1),
('low_1', 'low', 1),
('close_1', 'close', 1),
('volume_1', 'volume', 1),
('open_2', 'open', 2),
('high_2', 'high', 2),
('low_2', 'low', 2),
('close_2', 'close', 2),
('volume_2', 'volume', 2),
])
def test_skip_minutes_individual(self, name, field, sid):
# Test skipping minutes, to exercise backfills.
# Tests initial backfill and mid day backfill.
method_name = field + 's'
for i in [1, 5]:
minute = self.minutes[i]
asset = self.EQUITIES[sid]
value = getattr(self.equity_daily_aggregator, method_name)(
[asset], minute)[0]
# Prevent regression on building an array when scalar is intended.
self.assertIsInstance(value, Real)
assert_almost_equal(value,
self.expected_values[sid][field][i],
err_msg='sid={0} field={1} dt={2}'.format(
sid, field, minute))
# Call a second time with the same dt, to prevent regression
# against case where crossed start and end dts caused a crash
# instead of the last value.
value = getattr(self.equity_daily_aggregator, method_name)(
[asset], minute)[0]
# Prevent regression on building an array when scalar is intended.
self.assertIsInstance(value, Real)
assert_almost_equal(value,
self.expected_values[sid][field][i],
err_msg='sid={0} field={1} dt={2}'.format(
sid, field, minute))
@parameterized.expand(OHLCV)
def test_contiguous_minutes_multiple(self, field):
# First test each minute in order.
method_name = field + 's'
assets = sorted(self.EQUITIES.values())
results = {asset: [] for asset in assets}
repeat_results = {asset: [] for asset in assets}
for minute in self.minutes:
values = getattr(self.equity_daily_aggregator, method_name)(
assets, minute)
for j, asset in enumerate(assets):
value = values[j]
# Prevent regression on building an array when scalar is
# intended.
self.assertIsInstance(value, Real)
results[asset].append(value)
# Call a second time with the same dt, to prevent regression
# against case where crossed start and end dts caused a crash
# instead of the last value.
values = getattr(self.equity_daily_aggregator, method_name)(
assets, minute)
for j, asset in enumerate(assets):
value = values[j]
# Prevent regression on building an array when scalar is
# intended.
self.assertIsInstance(value, Real)
repeat_results[asset].append(value)
for asset in assets:
assert_almost_equal(results[asset],
self.expected_values[asset][field],
err_msg='sid={0} field={1}'.format(
asset, field))
assert_almost_equal(repeat_results[asset],
self.expected_values[asset][field],
err_msg='sid={0} field={1}'.format(
asset, field))
@parameterized.expand(OHLCV)
def test_skip_minutes_multiple(self, field):
# Test skipping minutes, to exercise backfills.
# Tests initial backfill and mid day backfill.
method_name = field + 's'
assets = sorted(self.EQUITIES.values())
for i in [1, 5]:
minute = self.minutes[i]
values = getattr(self.equity_daily_aggregator, method_name)(
assets, minute)
for j, asset in enumerate(assets):
value = values[j]
# Prevent regression on building an array when scalar is
# intended.
self.assertIsInstance(value, Real)
assert_almost_equal(
value,
self.expected_values[asset][field][i],
err_msg='sid={0} field={1} dt={2}'.format(
asset, field, minute))
# Call a second time with the same dt, to prevent regression
# against case where crossed start and end dts caused a crash
# instead of the last value.
values = getattr(self.equity_daily_aggregator, method_name)(
assets, minute)
for j, asset in enumerate(assets):
value = values[j]
# Prevent regression on building an array when scalar is
# intended.
self.assertIsInstance(value, Real)
assert_almost_equal(
value,
self.expected_values[asset][field][i],
err_msg='sid={0} field={1} dt={2}'.format(
asset, field, minute))
+14 -255
View File
@@ -1,18 +1,28 @@
#
# Copyright 2016 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from textwrap import dedent
from numbers import Real
from nose_parameterized import parameterized
import numpy as np
from numpy import nan
from numpy.testing import assert_almost_equal
import pandas as pd
from six import iteritems
from zipline import TradingAlgorithm
from zipline._protocol import handle_non_market_minutes
from zipline.assets import Asset
from zipline.data.data_portal import DailyHistoryAggregator
from zipline.errors import (
HistoryInInitialize,
HistoryWindowStartsBeforeData,
@@ -25,7 +35,6 @@ from zipline.testing import (
MockDailyBarReader,
)
from zipline.testing.fixtures import (
WithBcolzEquityMinuteBarReader,
WithDataPortal,
ZiplineTestCase,
alias,
@@ -33,7 +42,6 @@ from zipline.testing.fixtures import (
OHLC = ['open', 'high', 'low', 'close']
OHLCV = OHLC + ['volume']
OHLCP = OHLC + ['price']
ALL_FIELDS = OHLCP + ['volume']
@@ -1712,252 +1720,3 @@ class DailyEquityHistoryTestCase(WithHistory, ZiplineTestCase):
window_2[self.ASSET1].values)
np.testing.assert_almost_equal(window_1[self.ASSET2].values,
window_2[self.ASSET2].values)
class MinuteToDailyAggregationTestCase(WithBcolzEquityMinuteBarReader,
ZiplineTestCase):
# March 2016
# Su Mo Tu We Th Fr Sa
# 1 2 3 4 5
# 6 7 8 9 10 11 12
# 13 14 15 16 17 18 19
# 20 21 22 23 24 25 26
# 27 28 29 30 31
TRADING_ENV_MIN_DATE = START_DATE = pd.Timestamp(
'2016-03-01', tz='UTC',
)
TRADING_ENV_MAX_DATE = END_DATE = pd.Timestamp(
'2016-03-31', tz='UTC',
)
ASSET_FINDER_EQUITY_SIDS = 1, 2
minutes = pd.date_range('2016-03-15 9:31',
'2016-03-15 9:36',
freq='min',
tz='US/Eastern').tz_convert('UTC')
@classmethod
def make_equity_minute_bar_data(cls):
# sid data is created so that at least one high is lower than a
# previous high, and the inverse for low
yield 1, pd.DataFrame(
{
'open': [nan, 103.50, 102.50, 104.50, 101.50, nan],
'high': [nan, 103.90, 102.90, 104.90, 101.90, nan],
'low': [nan, 103.10, 102.10, 104.10, 101.10, nan],
'close': [nan, 103.30, 102.30, 104.30, 101.30, nan],
'volume': [0, 1003, 1002, 1004, 1001, 0]
},
index=cls.minutes,
)
# sid 2 is included to provide data on different bars than sid 1,
# as will as illiquidty mid-day
yield 2, pd.DataFrame(
{
'open': [201.50, nan, 204.50, nan, 200.50, 202.50],
'high': [201.90, nan, 204.90, nan, 200.90, 202.90],
'low': [201.10, nan, 204.10, nan, 200.10, 202.10],
'close': [201.30, nan, 203.50, nan, 200.30, 202.30],
'volume': [2001, 0, 2004, 0, 2000, 2002],
},
index=cls.minutes,
)
expected_values = {
1: pd.DataFrame(
{
'open': [nan, 103.50, 103.50, 103.50, 103.50, 103.50],
'high': [nan, 103.90, 103.90, 104.90, 104.90, 104.90],
'low': [nan, 103.10, 102.10, 102.10, 101.10, 101.10],
'close': [nan, 103.30, 102.30, 104.30, 101.30, 101.30],
'volume': [0, 1003, 2005, 3009, 4010, 4010]
},
index=minutes,
),
2: pd.DataFrame(
{
'open': [201.50, 201.50, 201.50, 201.50, 201.50, 201.50],
'high': [201.90, 201.90, 204.90, 204.90, 204.90, 204.90],
'low': [201.10, 201.10, 201.10, 201.10, 200.10, 200.10],
'close': [201.30, 201.30, 203.50, 203.50, 200.30, 202.30],
'volume': [2001, 2001, 4005, 4005, 6005, 8007],
},
index=minutes,
)
}
@classmethod
def init_class_fixtures(cls):
super(MinuteToDailyAggregationTestCase, cls).init_class_fixtures()
cls.EQUITIES = {
1: cls.env.asset_finder.retrieve_asset(1),
2: cls.env.asset_finder.retrieve_asset(2)
}
def init_instance_fixtures(self):
super(MinuteToDailyAggregationTestCase, self).init_instance_fixtures()
# Set up a fresh data portal for each test, since order of calling
# needs to be tested.
self.equity_daily_aggregator = DailyHistoryAggregator(
self.trading_calendar.schedule.market_open,
self.bcolz_equity_minute_bar_reader,
)
@parameterized.expand([
('open_sid_1', 'open', 1),
('high_1', 'high', 1),
('low_1', 'low', 1),
('close_1', 'close', 1),
('volume_1', 'volume', 1),
('open_2', 'open', 2),
('high_2', 'high', 2),
('low_2', 'low', 2),
('close_2', 'close', 2),
('volume_2', 'volume', 2),
])
def test_contiguous_minutes_individual(self, name, field, sid):
# First test each minute in order.
method_name = field + 's'
results = []
repeat_results = []
asset = self.EQUITIES[sid]
for minute in self.minutes:
value = getattr(self.equity_daily_aggregator, method_name)(
[asset], minute)[0]
# Prevent regression on building an array when scalar is intended.
self.assertIsInstance(value, Real)
results.append(value)
# Call a second time with the same dt, to prevent regression
# against case where crossed start and end dts caused a crash
# instead of the last value.
value = getattr(self.equity_daily_aggregator, method_name)(
[asset], minute)[0]
# Prevent regression on building an array when scalar is intended.
self.assertIsInstance(value, Real)
repeat_results.append(value)
assert_almost_equal(results, self.expected_values[asset][field],
err_msg='sid={0} field={1}'.format(asset, field))
assert_almost_equal(repeat_results, self.expected_values[asset][field],
err_msg='sid={0} field={1}'.format(asset, field))
@parameterized.expand([
('open_sid_1', 'open', 1),
('high_1', 'high', 1),
('low_1', 'low', 1),
('close_1', 'close', 1),
('volume_1', 'volume', 1),
('open_2', 'open', 2),
('high_2', 'high', 2),
('low_2', 'low', 2),
('close_2', 'close', 2),
('volume_2', 'volume', 2),
])
def test_skip_minutes_individual(self, name, field, sid):
# Test skipping minutes, to exercise backfills.
# Tests initial backfill and mid day backfill.
method_name = field + 's'
for i in [1, 5]:
minute = self.minutes[i]
asset = self.EQUITIES[sid]
value = getattr(self.equity_daily_aggregator, method_name)(
[asset], minute)[0]
# Prevent regression on building an array when scalar is intended.
self.assertIsInstance(value, Real)
assert_almost_equal(value,
self.expected_values[sid][field][i],
err_msg='sid={0} field={1} dt={2}'.format(
sid, field, minute))
# Call a second time with the same dt, to prevent regression
# against case where crossed start and end dts caused a crash
# instead of the last value.
value = getattr(self.equity_daily_aggregator, method_name)(
[asset], minute)[0]
# Prevent regression on building an array when scalar is intended.
self.assertIsInstance(value, Real)
assert_almost_equal(value,
self.expected_values[sid][field][i],
err_msg='sid={0} field={1} dt={2}'.format(
sid, field, minute))
@parameterized.expand(OHLCV)
def test_contiguous_minutes_multiple(self, field):
# First test each minute in order.
method_name = field + 's'
assets = sorted(self.EQUITIES.values())
results = {asset: [] for asset in assets}
repeat_results = {asset: [] for asset in assets}
for minute in self.minutes:
values = getattr(self.equity_daily_aggregator, method_name)(
assets, minute)
for j, asset in enumerate(assets):
value = values[j]
# Prevent regression on building an array when scalar is
# intended.
self.assertIsInstance(value, Real)
results[asset].append(value)
# Call a second time with the same dt, to prevent regression
# against case where crossed start and end dts caused a crash
# instead of the last value.
values = getattr(self.equity_daily_aggregator, method_name)(
assets, minute)
for j, asset in enumerate(assets):
value = values[j]
# Prevent regression on building an array when scalar is
# intended.
self.assertIsInstance(value, Real)
repeat_results[asset].append(value)
for asset in assets:
assert_almost_equal(results[asset],
self.expected_values[asset][field],
err_msg='sid={0} field={1}'.format(
asset, field))
assert_almost_equal(repeat_results[asset],
self.expected_values[asset][field],
err_msg='sid={0} field={1}'.format(
asset, field))
@parameterized.expand(OHLCV)
def test_skip_minutes_multiple(self, field):
# Test skipping minutes, to exercise backfills.
# Tests initial backfill and mid day backfill.
method_name = field + 's'
assets = sorted(self.EQUITIES.values())
for i in [1, 5]:
minute = self.minutes[i]
values = getattr(self.equity_daily_aggregator, method_name)(
assets, minute)
for j, asset in enumerate(assets):
value = values[j]
# Prevent regression on building an array when scalar is
# intended.
self.assertIsInstance(value, Real)
assert_almost_equal(
value,
self.expected_values[asset][field][i],
err_msg='sid={0} field={1} dt={2}'.format(
asset, field, minute))
# Call a second time with the same dt, to prevent regression
# against case where crossed start and end dts caused a crash
# instead of the last value.
values = getattr(self.equity_daily_aggregator, method_name)(
assets, minute)
for j, asset in enumerate(assets):
value = values[j]
# Prevent regression on building an array when scalar is
# intended.
self.assertIsInstance(value, Real)
assert_almost_equal(
value,
self.expected_values[asset][field][i],
err_msg='sid={0} field={1} dt={2}'.format(
asset, field, minute))
+417
View File
@@ -0,0 +1,417 @@
#
# Copyright 2016 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import pandas as pd
from pandas.tslib import normalize_date
class DailyHistoryAggregator(object):
"""
Converts minute pricing data into a daily summary, to be used for the
last slot in a call to history with a frequency of `1d`.
This summary is the same as a daily bar rollup of minute data, with the
distinction that the summary is truncated to the `dt` requested.
i.e. the aggregation slides forward during a the course of simulation day.
Provides aggregation for `open`, `high`, `low`, `close`, and `volume`.
The aggregation rules for each price type is documented in their respective
"""
def __init__(self, market_opens, minute_reader):
self._market_opens = market_opens
self._minute_reader = minute_reader
# The caches are structured as (date, market_open, entries), where
# entries is a dict of asset -> (last_visited_dt, value)
#
# Whenever an aggregation method determines the current value,
# the entry for the respective asset should be overwritten with a new
# entry for the current dt.value (int) and aggregation value.
#
# When the requested dt's date is different from date the cache is
# flushed, so that the cache entries do not grow unbounded.
#
# Example cache:
# cache = (date(2016, 3, 17),
# pd.Timestamp('2016-03-17 13:31', tz='UTC'),
# {
# 1: (1458221460000000000, np.nan),
# 2: (1458221460000000000, 42.0),
# })
self._caches = {
'open': None,
'high': None,
'low': None,
'close': None,
'volume': None
}
# The int value is used for deltas to avoid extra computation from
# creating new Timestamps.
self._one_min = pd.Timedelta('1 min').value
def _prelude(self, dt, field):
date = dt.date()
dt_value = dt.value
cache = self._caches[field]
if cache is None or cache[0] != date:
market_open = self._market_opens.loc[date]
cache = self._caches[field] = (dt.date(), market_open, {})
_, market_open, entries = cache
market_open = market_open.tz_localize('UTC')
if dt != market_open:
prev_dt = dt_value - self._one_min
else:
prev_dt = None
return market_open, prev_dt, dt_value, entries
def opens(self, assets, dt):
"""
The open field's aggregation returns the first value that occurs
for the day, if there has been no data on or before the `dt` the open
is `nan`.
Once the first non-nan open is seen, that value remains constant per
asset for the remainder of the day.
Returns
-------
np.array with dtype=float64, in order of assets parameter.
"""
market_open, prev_dt, dt_value, entries = self._prelude(dt, 'open')
opens = []
normalized_date = normalize_date(dt)
for asset in assets:
if not asset._is_alive(normalized_date, True):
opens.append(np.NaN)
continue
if prev_dt is None:
val = self._minute_reader.get_value(asset, dt, 'open')
entries[asset] = (dt_value, val)
opens.append(val)
continue
else:
try:
last_visited_dt, first_open = entries[asset]
if last_visited_dt == dt_value:
opens.append(first_open)
continue
elif not pd.isnull(first_open):
opens.append(first_open)
entries[asset] = (dt_value, first_open)
continue
else:
after_last = pd.Timestamp(
last_visited_dt + self._one_min, tz='UTC')
window = self._minute_reader.load_raw_arrays(
['open'],
after_last,
dt,
[asset],
)[0]
nonnan = window[~pd.isnull(window)]
if len(nonnan):
val = nonnan[0]
else:
val = np.nan
entries[asset] = (dt_value, val)
opens.append(val)
continue
except KeyError:
window = self._minute_reader.load_raw_arrays(
['open'],
market_open,
dt,
[asset],
)[0]
nonnan = window[~pd.isnull(window)]
if len(nonnan):
val = nonnan[0]
else:
val = np.nan
entries[asset] = (dt_value, val)
opens.append(val)
continue
return np.array(opens)
def highs(self, assets, dt):
"""
The high field's aggregation returns the largest high seen between
the market open and the current dt.
If there has been no data on or before the `dt` the high is `nan`.
Returns
-------
np.array with dtype=float64, in order of assets parameter.
"""
market_open, prev_dt, dt_value, entries = self._prelude(dt, 'high')
highs = []
normalized_date = normalize_date(dt)
for asset in assets:
if not asset._is_alive(normalized_date, True):
highs.append(np.NaN)
continue
if prev_dt is None:
val = self._minute_reader.get_value(asset, dt, 'high')
entries[asset] = (dt_value, val)
highs.append(val)
continue
else:
try:
last_visited_dt, last_max = entries[asset]
if last_visited_dt == dt_value:
highs.append(last_max)
continue
elif last_visited_dt == prev_dt:
curr_val = self._minute_reader.get_value(
asset, dt, 'high')
if pd.isnull(curr_val):
val = last_max
elif pd.isnull(last_max):
val = curr_val
else:
val = max(last_max, curr_val)
entries[asset] = (dt_value, val)
highs.append(val)
continue
else:
after_last = pd.Timestamp(
last_visited_dt + self._one_min, tz='UTC')
window = self._minute_reader.load_raw_arrays(
['high'],
after_last,
dt,
[asset],
)[0].T
val = max(last_max, np.nanmax(window))
entries[asset] = (dt_value, val)
highs.append(val)
continue
except KeyError:
window = self._minute_reader.load_raw_arrays(
['high'],
market_open,
dt,
[asset],
)[0].T
val = np.nanmax(window)
entries[asset] = (dt_value, val)
highs.append(val)
continue
return np.array(highs)
def lows(self, assets, dt):
"""
The low field's aggregation returns the smallest low seen between
the market open and the current dt.
If there has been no data on or before the `dt` the low is `nan`.
Returns
-------
np.array with dtype=float64, in order of assets parameter.
"""
market_open, prev_dt, dt_value, entries = self._prelude(dt, 'low')
lows = []
normalized_date = normalize_date(dt)
for asset in assets:
if not asset._is_alive(normalized_date, True):
lows.append(np.NaN)
continue
if prev_dt is None:
val = self._minute_reader.get_value(asset, dt, 'low')
entries[asset] = (dt_value, val)
lows.append(val)
continue
else:
try:
last_visited_dt, last_min = entries[asset]
if last_visited_dt == dt_value:
lows.append(last_min)
continue
elif last_visited_dt == prev_dt:
curr_val = self._minute_reader.get_value(
asset, dt, 'low')
val = np.nanmin([last_min, curr_val])
entries[asset] = (dt_value, val)
lows.append(val)
continue
else:
after_last = pd.Timestamp(
last_visited_dt + self._one_min, tz='UTC')
window = self._minute_reader.load_raw_arrays(
['low'],
after_last,
dt,
[asset],
)[0].T
window_min = np.nanmin(window)
if pd.isnull(window_min):
val = last_min
else:
val = min(last_min, window_min)
entries[asset] = (dt_value, val)
lows.append(val)
continue
except KeyError:
window = self._minute_reader.load_raw_arrays(
['low'],
market_open,
dt,
[asset],
)[0].T
val = np.nanmin(window)
entries[asset] = (dt_value, val)
lows.append(val)
continue
return np.array(lows)
def closes(self, assets, dt):
"""
The close field's aggregation returns the latest close at the given
dt.
If the close for the given dt is `nan`, the most recent non-nan
`close` is used.
If there has been no data on or before the `dt` the close is `nan`.
Returns
-------
np.array with dtype=float64, in order of assets parameter.
"""
market_open, prev_dt, dt_value, entries = self._prelude(dt, 'close')
closes = []
normalized_dt = normalize_date(dt)
for asset in assets:
if not asset._is_alive(normalized_dt, True):
closes.append(np.NaN)
continue
if prev_dt is None:
val = self._minute_reader.get_value(asset, dt, 'close')
entries[asset] = (dt_value, val)
closes.append(val)
continue
else:
try:
last_visited_dt, last_close = entries[asset]
if last_visited_dt == dt_value:
closes.append(last_close)
continue
elif last_visited_dt == prev_dt:
val = self._minute_reader.get_value(
asset, dt, 'close')
if pd.isnull(val):
val = last_close
entries[asset] = (dt_value, val)
closes.append(val)
continue
else:
val = self._minute_reader.get_value(
asset, dt, 'close')
if pd.isnull(val):
val = self.closes(
[asset],
pd.Timestamp(prev_dt, tz='UTC'))[0]
entries[asset] = (dt_value, val)
closes.append(val)
continue
except KeyError:
val = self._minute_reader.get_value(
asset, dt, 'close')
if pd.isnull(val):
val = self.closes([asset],
pd.Timestamp(prev_dt, tz='UTC'))[0]
entries[asset] = (dt_value, val)
closes.append(val)
continue
return np.array(closes)
def volumes(self, assets, dt):
"""
The volume field's aggregation returns the sum of all volumes
between the market open and the `dt`
If there has been no data on or before the `dt` the volume is 0.
Returns
-------
np.array with dtype=int64, in order of assets parameter.
"""
market_open, prev_dt, dt_value, entries = self._prelude(dt, 'volume')
volumes = []
normalized_date = normalize_date(dt)
for asset in assets:
if not asset._is_alive(normalized_date, True):
volumes.append(0)
continue
if prev_dt is None:
val = self._minute_reader.get_value(asset, dt, 'volume')
entries[asset] = (dt_value, val)
volumes.append(val)
continue
else:
try:
last_visited_dt, last_total = entries[asset]
if last_visited_dt == dt_value:
volumes.append(last_total)
continue
elif last_visited_dt == prev_dt:
val = self._minute_reader.get_value(
asset, dt, 'volume')
val += last_total
entries[asset] = (dt_value, val)
volumes.append(val)
continue
else:
after_last = pd.Timestamp(
last_visited_dt + self._one_min, tz='UTC')
window = self._minute_reader.load_raw_arrays(
['volume'],
after_last,
dt,
[asset],
)[0]
val = np.nansum(window) + last_total
entries[asset] = (dt_value, val)
volumes.append(val)
continue
except KeyError:
window = self._minute_reader.load_raw_arrays(
['volume'],
market_open,
dt,
[asset],
)[0]
val = np.nansum(window)
entries[asset] = (dt_value, val)
volumes.append(val)
continue
return np.array(volumes)
+1 -399
View File
@@ -24,6 +24,7 @@ from six import iteritems
from six.moves import reduce
from zipline.assets import Asset, Future, Equity
from zipline.data.daily_history_aggregator import DailyHistoryAggregator
from zipline.data.us_equity_pricing import NoDataOnDate
from zipline.data.us_equity_loader import (
USEquityDailyHistoryLoader,
@@ -59,405 +60,6 @@ OHLCVP_FIELDS = frozenset([
HISTORY_FREQUENCIES = set(["1m", "1d"])
class DailyHistoryAggregator(object):
"""
Converts minute pricing data into a daily summary, to be used for the
last slot in a call to history with a frequency of `1d`.
This summary is the same as a daily bar rollup of minute data, with the
distinction that the summary is truncated to the `dt` requested.
i.e. the aggregation slides forward during a the course of simulation day.
Provides aggregation for `open`, `high`, `low`, `close`, and `volume`.
The aggregation rules for each price type is documented in their respective
"""
def __init__(self, market_opens, minute_reader):
self._market_opens = market_opens
self._minute_reader = minute_reader
# The caches are structured as (date, market_open, entries), where
# entries is a dict of asset -> (last_visited_dt, value)
#
# Whenever an aggregation method determines the current value,
# the entry for the respective asset should be overwritten with a new
# entry for the current dt.value (int) and aggregation value.
#
# When the requested dt's date is different from date the cache is
# flushed, so that the cache entries do not grow unbounded.
#
# Example cache:
# cache = (date(2016, 3, 17),
# pd.Timestamp('2016-03-17 13:31', tz='UTC'),
# {
# 1: (1458221460000000000, np.nan),
# 2: (1458221460000000000, 42.0),
# })
self._caches = {
'open': None,
'high': None,
'low': None,
'close': None,
'volume': None
}
# The int value is used for deltas to avoid extra computation from
# creating new Timestamps.
self._one_min = pd.Timedelta('1 min').value
def _prelude(self, dt, field):
date = dt.date()
dt_value = dt.value
cache = self._caches[field]
if cache is None or cache[0] != date:
market_open = self._market_opens.loc[date]
cache = self._caches[field] = (dt.date(), market_open, {})
_, market_open, entries = cache
market_open = market_open.tz_localize('UTC')
if dt != market_open:
prev_dt = dt_value - self._one_min
else:
prev_dt = None
return market_open, prev_dt, dt_value, entries
def opens(self, assets, dt):
"""
The open field's aggregation returns the first value that occurs
for the day, if there has been no data on or before the `dt` the open
is `nan`.
Once the first non-nan open is seen, that value remains constant per
asset for the remainder of the day.
Returns
-------
np.array with dtype=float64, in order of assets parameter.
"""
market_open, prev_dt, dt_value, entries = self._prelude(dt, 'open')
opens = []
normalized_date = normalize_date(dt)
for asset in assets:
if not asset._is_alive(normalized_date, True):
opens.append(np.NaN)
continue
if prev_dt is None:
val = self._minute_reader.get_value(asset, dt, 'open')
entries[asset] = (dt_value, val)
opens.append(val)
continue
else:
try:
last_visited_dt, first_open = entries[asset]
if last_visited_dt == dt_value:
opens.append(first_open)
continue
elif not pd.isnull(first_open):
opens.append(first_open)
entries[asset] = (dt_value, first_open)
continue
else:
after_last = pd.Timestamp(
last_visited_dt + self._one_min, tz='UTC')
window = self._minute_reader.load_raw_arrays(
['open'],
after_last,
dt,
[asset],
)[0]
nonnan = window[~pd.isnull(window)]
if len(nonnan):
val = nonnan[0]
else:
val = np.nan
entries[asset] = (dt_value, val)
opens.append(val)
continue
except KeyError:
window = self._minute_reader.load_raw_arrays(
['open'],
market_open,
dt,
[asset],
)[0]
nonnan = window[~pd.isnull(window)]
if len(nonnan):
val = nonnan[0]
else:
val = np.nan
entries[asset] = (dt_value, val)
opens.append(val)
continue
return np.array(opens)
def highs(self, assets, dt):
"""
The high field's aggregation returns the largest high seen between
the market open and the current dt.
If there has been no data on or before the `dt` the high is `nan`.
Returns
-------
np.array with dtype=float64, in order of assets parameter.
"""
market_open, prev_dt, dt_value, entries = self._prelude(dt, 'high')
highs = []
normalized_date = normalize_date(dt)
for asset in assets:
if not asset._is_alive(normalized_date, True):
highs.append(np.NaN)
continue
if prev_dt is None:
val = self._minute_reader.get_value(asset, dt, 'high')
entries[asset] = (dt_value, val)
highs.append(val)
continue
else:
try:
last_visited_dt, last_max = entries[asset]
if last_visited_dt == dt_value:
highs.append(last_max)
continue
elif last_visited_dt == prev_dt:
curr_val = self._minute_reader.get_value(
asset, dt, 'high')
if pd.isnull(curr_val):
val = last_max
elif pd.isnull(last_max):
val = curr_val
else:
val = max(last_max, curr_val)
entries[asset] = (dt_value, val)
highs.append(val)
continue
else:
after_last = pd.Timestamp(
last_visited_dt + self._one_min, tz='UTC')
window = self._minute_reader.load_raw_arrays(
['high'],
after_last,
dt,
[asset],
)[0].T
val = max(last_max, np.nanmax(window))
entries[asset] = (dt_value, val)
highs.append(val)
continue
except KeyError:
window = self._minute_reader.load_raw_arrays(
['high'],
market_open,
dt,
[asset],
)[0].T
val = np.nanmax(window)
entries[asset] = (dt_value, val)
highs.append(val)
continue
return np.array(highs)
def lows(self, assets, dt):
"""
The low field's aggregation returns the smallest low seen between
the market open and the current dt.
If there has been no data on or before the `dt` the low is `nan`.
Returns
-------
np.array with dtype=float64, in order of assets parameter.
"""
market_open, prev_dt, dt_value, entries = self._prelude(dt, 'low')
lows = []
normalized_date = normalize_date(dt)
for asset in assets:
if not asset._is_alive(normalized_date, True):
lows.append(np.NaN)
continue
if prev_dt is None:
val = self._minute_reader.get_value(asset, dt, 'low')
entries[asset] = (dt_value, val)
lows.append(val)
continue
else:
try:
last_visited_dt, last_min = entries[asset]
if last_visited_dt == dt_value:
lows.append(last_min)
continue
elif last_visited_dt == prev_dt:
curr_val = self._minute_reader.get_value(
asset, dt, 'low')
val = np.nanmin([last_min, curr_val])
entries[asset] = (dt_value, val)
lows.append(val)
continue
else:
after_last = pd.Timestamp(
last_visited_dt + self._one_min, tz='UTC')
window = self._minute_reader.load_raw_arrays(
['low'],
after_last,
dt,
[asset],
)[0].T
window_min = np.nanmin(window)
if pd.isnull(window_min):
val = last_min
else:
val = min(last_min, window_min)
entries[asset] = (dt_value, val)
lows.append(val)
continue
except KeyError:
window = self._minute_reader.load_raw_arrays(
['low'],
market_open,
dt,
[asset],
)[0].T
val = np.nanmin(window)
entries[asset] = (dt_value, val)
lows.append(val)
continue
return np.array(lows)
def closes(self, assets, dt):
"""
The close field's aggregation returns the latest close at the given
dt.
If the close for the given dt is `nan`, the most recent non-nan
`close` is used.
If there has been no data on or before the `dt` the close is `nan`.
Returns
-------
np.array with dtype=float64, in order of assets parameter.
"""
market_open, prev_dt, dt_value, entries = self._prelude(dt, 'close')
closes = []
normalized_dt = normalize_date(dt)
for asset in assets:
if not asset._is_alive(normalized_dt, True):
closes.append(np.NaN)
continue
if prev_dt is None:
val = self._minute_reader.get_value(asset, dt, 'close')
entries[asset] = (dt_value, val)
closes.append(val)
continue
else:
try:
last_visited_dt, last_close = entries[asset]
if last_visited_dt == dt_value:
closes.append(last_close)
continue
elif last_visited_dt == prev_dt:
val = self._minute_reader.get_value(
asset, dt, 'close')
if pd.isnull(val):
val = last_close
entries[asset] = (dt_value, val)
closes.append(val)
continue
else:
val = self._minute_reader.get_value(
asset, dt, 'close')
if pd.isnull(val):
val = self.closes(
[asset],
pd.Timestamp(prev_dt, tz='UTC'))[0]
entries[asset] = (dt_value, val)
closes.append(val)
continue
except KeyError:
val = self._minute_reader.get_value(
asset, dt, 'close')
if pd.isnull(val):
val = self.closes([asset],
pd.Timestamp(prev_dt, tz='UTC'))[0]
entries[asset] = (dt_value, val)
closes.append(val)
continue
return np.array(closes)
def volumes(self, assets, dt):
"""
The volume field's aggregation returns the sum of all volumes
between the market open and the `dt`
If there has been no data on or before the `dt` the volume is 0.
Returns
-------
np.array with dtype=int64, in order of assets parameter.
"""
market_open, prev_dt, dt_value, entries = self._prelude(dt, 'volume')
volumes = []
normalized_date = normalize_date(dt)
for asset in assets:
if not asset._is_alive(normalized_date, True):
volumes.append(0)
continue
if prev_dt is None:
val = self._minute_reader.get_value(asset, dt, 'volume')
entries[asset] = (dt_value, val)
volumes.append(val)
continue
else:
try:
last_visited_dt, last_total = entries[asset]
if last_visited_dt == dt_value:
volumes.append(last_total)
continue
elif last_visited_dt == prev_dt:
val = self._minute_reader.get_value(
asset, dt, 'volume')
val += last_total
entries[asset] = (dt_value, val)
volumes.append(val)
continue
else:
after_last = pd.Timestamp(
last_visited_dt + self._one_min, tz='UTC')
window = self._minute_reader.load_raw_arrays(
['volume'],
after_last,
dt,
[asset],
)[0]
val = np.nansum(window) + last_total
entries[asset] = (dt_value, val)
volumes.append(val)
continue
except KeyError:
window = self._minute_reader.load_raw_arrays(
['volume'],
market_open,
dt,
[asset],
)[0]
val = np.nansum(window)
entries[asset] = (dt_value, val)
volumes.append(val)
continue
return np.array(volumes)
class DataPortal(object):
"""Interface to all of the data that a zipline simulation needs.