ENH: Allows history to be dynamic and grow the container at runtime.

Previously, all specs had to be pre-allocated by using the 'add_history'
function. This is now no longer required and instead serves as a hint to
the HistoryContainer to pre-allocate the space for the given spec.

History can grow by increasing the length for a frequency, adding a
frequency, or adding a field. It can grow with any combination of
these.

HistoryContainer now is aware of the data_frequency of the algorithm,
and no longer uses the daily_at_midnight flag; instead, this is the
default behavior.
This commit is contained in:
Joe Jevnik
2014-10-06 13:37:09 -04:00
parent 1c08f60f7c
commit f8f7f2fc4c
9 changed files with 963 additions and 180 deletions
+5 -9
View File
@@ -534,9 +534,7 @@ HISTORY_CONTAINER_TEST_CASES = {
to_utc('2013-06-28 9:31AM'),
],
# Missing volume data should manifest as 0's rather
# than nans.
).fillna(0 if 'volume' in key else np.nan),
),
pd.DataFrame(
data={
1: [np.nan, 0, 1],
@@ -547,7 +545,7 @@ HISTORY_CONTAINER_TEST_CASES = {
to_utc('2013-06-28 9:31AM'),
to_utc('2013-06-28 9:32AM'),
],
).fillna(0 if 'volume' in key else np.nan),
),
pd.DataFrame(
data={
@@ -560,11 +558,7 @@ HISTORY_CONTAINER_TEST_CASES = {
to_utc('2013-06-28 9:33AM'),
],
# Note: Calling fillna() here even though there are
# no NaNs because this makes it less likely
# for us to introduce a stupid bug by
# copy/pasting in the future.
).fillna(0 if 'volume' in key else np.nan),
),
pd.DataFrame(
data={
1: [2, np.nan, 3],
@@ -575,6 +569,8 @@ HISTORY_CONTAINER_TEST_CASES = {
to_utc('2013-06-28 9:34AM'),
to_utc('2013-06-28 9:35AM'),
],
# For volume, when we are missing data, we replace
# it with 0s to show that no trades occured.
).fillna(0 if 'volume' in key else np.nan),
],
)
+1 -2
View File
@@ -122,8 +122,7 @@ class TestChangeOfSids(TestCase):
for sid in self.sids[:i]:
self.assertIn(sid, df.columns)
last_elem = len(df) - 1
self.assertEqual(df[last_elem][last_elem], last_elem)
self.assertEqual(df.iloc[-1].iloc[-1], i)
class TestBatchTransformMinutely(TestCase):
+269 -7
View File
@@ -14,18 +14,24 @@
# limitations under the License.
from unittest import TestCase
from itertools import product
from textwrap import dedent
from nose_parameterized import parameterized
import numpy as np
import pandas as pd
from pandas.util.testing import assert_frame_equal
from zipline.history import history
from zipline.history import history, Frequency
from zipline.history.history_container import HistoryContainer
from zipline.protocol import BarData
import zipline.utils.factory as factory
from zipline import TradingAlgorithm
from zipline.finance.trading import SimulationParameters, TradingEnvironment
from zipline.finance.trading import (
SimulationParameters,
TradingEnvironment,
with_environment,
)
from zipline.errors import IncompatibleHistoryFrequency
from zipline.sources import RandomWalkSource, DataFrameSource
@@ -131,7 +137,6 @@ def get_index_at_dt(case_input):
case_input['frequency'],
None,
False,
daily_at_midnight=False,
data_frequency='minute',
)
return history.index_at_dt(history_spec, case_input['algo_dt'])
@@ -197,7 +202,7 @@ class TestHistoryContainer(TestCase):
self.assertEqual(len(expected[spec.key_str]), len(updates))
container = HistoryContainer(
{spec.key_str: spec for spec in specs}, sids, dt
{spec.key_str: spec for spec in specs}, sids, dt, 'minute',
)
for update_count, update in enumerate(updates):
@@ -222,7 +227,7 @@ class TestHistoryContainer(TestCase):
frequency='1d',
field='price',
ffill=True,
daily_at_midnight=False
data_frequency='minute'
)
specs = {spec.key_str: spec}
initial_sids = [1, ]
@@ -230,7 +235,8 @@ class TestHistoryContainer(TestCase):
'2013-06-28 9:31AM', tz='US/Eastern').tz_convert('UTC')
container = HistoryContainer(
specs, initial_sids, initial_dt)
specs, initial_sids, initial_dt, 'minute'
)
bar_data = BarData()
container.update(bar_data, initial_dt)
@@ -373,7 +379,7 @@ def handle_data(context, data):
end = pd.Timestamp('2006-03-30', tz='UTC')
sim_params = factory.create_simulation_parameters(
start=start, end=end)
start=start, end=end, data_frequency='daily')
_, df = factory.create_test_df_source(sim_params)
df = df.astype(np.float64)
@@ -867,3 +873,259 @@ def handle_data(context, data):
# Depends on seed
np.testing.assert_almost_equal(recorded_ma,
159.76304468946876)
def test_history_container_constructed_at_runtime(self):
algo_text = dedent(
"""\
from zipline.api import history
def handle_data(context, data):
context.prices = history(2, '1d', 'price')
"""
)
start = pd.Timestamp('2007-04-05', tz='UTC')
end = pd.Timestamp('2007-04-10', tz='UTC')
sim_params = SimulationParameters(
period_start=start,
period_end=end,
capital_base=float("1.0e5"),
data_frequency='minute',
emission_rate='daily'
)
test_algo = TradingAlgorithm(
script=algo_text,
data_frequency='minute',
sim_params=sim_params
)
source = RandomWalkSource(start=start, end=end)
self.assertIsNone(test_algo.history_container)
test_algo.run(source)
self.assertIsNotNone(
test_algo.history_container,
msg='HistoryContainer was not constructed at runtime',
)
container = test_algo.history_container
self.assertEqual(
container.buffer_panel.window_length,
Frequency.MAX_MINUTES['d'],
msg='HistoryContainer.buffer_panel was not large enough to service'
' the given HistorySpec',
)
self.assertEqual(
len(container.digest_panels),
1,
msg='The HistoryContainer created too many digest panels',
)
freq, digest = list(container.digest_panels.items())[0]
self.assertEqual(
freq.unit_str,
'd',
)
self.assertEqual(
digest.window_length,
1,
msg='The digest panel is not large enough to service the given'
' HistorySpec',
)
class TestHistoryContainerResize(TestCase):
@parameterized.expand(
(freq, field, data_frequency, construct_digest)
for freq in ('1m', '1d')
for field in HistoryContainer.VALID_FIELDS
for data_frequency in ('minute', 'daily')
for construct_digest in (True, False)
if not (freq == '1m' and data_frequency == 'daily')
)
def test_history_grow_length(self,
freq,
field,
data_frequency,
construct_digest):
bar_count = 2 if construct_digest else 1
spec = history.HistorySpec(
bar_count=bar_count,
frequency=freq,
field=field,
ffill=True,
data_frequency=data_frequency,
)
specs = {spec.key_str: spec}
initial_sids = [1]
initial_dt = pd.Timestamp(
'2013-06-28 13:31AM'
if data_frequency == 'minute'
else '2013-06-28 12:00AM',
tz='UTC',
)
container = HistoryContainer(
specs, initial_sids, initial_dt, data_frequency,
)
if construct_digest:
self.assertEqual(
container.digest_panels[spec.frequency].window_length, 1,
)
bar_data = BarData()
container.update(bar_data, initial_dt)
to_add = (
history.HistorySpec(
bar_count=bar_count + 1,
frequency=freq,
field=field,
ffill=True,
data_frequency=data_frequency,
),
history.HistorySpec(
bar_count=bar_count + 2,
frequency=freq,
field=field,
ffill=True,
data_frequency=data_frequency,
),
)
for spec in to_add:
container.ensure_spec(spec, initial_dt)
self.assertEqual(
container.digest_panels[spec.frequency].window_length,
spec.bar_count - 1,
)
self.assert_history(container, spec, initial_dt)
@parameterized.expand(
(bar_count, freq, pair, data_frequency)
for bar_count in (1, 2)
for freq in ('1m', '1d')
for pair in product(HistoryContainer.VALID_FIELDS, repeat=2)
for data_frequency in ('minute', 'daily')
if not (freq == '1m' and data_frequency == 'daily')
)
def test_history_add_field(self, bar_count, freq, pair, data_frequency):
first, second = pair
spec = history.HistorySpec(
bar_count=bar_count,
frequency=freq,
field=first,
ffill=True,
data_frequency=data_frequency,
)
specs = {spec.key_str: spec}
initial_sids = [1]
initial_dt = pd.Timestamp(
'2013-06-28 13:31AM'
if data_frequency == 'minute'
else '2013-06-28 12:00AM',
tz='UTC',
)
container = HistoryContainer(
specs, initial_sids, initial_dt, data_frequency,
)
if bar_count > 1:
self.assertEqual(
container.digest_panels[spec.frequency].window_length, 1,
)
bar_data = BarData()
container.update(bar_data, initial_dt)
new_spec = history.HistorySpec(
bar_count,
frequency=freq,
field=second,
ffill=True,
data_frequency=data_frequency,
)
container.ensure_spec(new_spec, initial_dt)
if bar_count > 1:
digest_panel = container.digest_panels[new_spec.frequency]
self.assertEqual(digest_panel.window_length, bar_count - 1)
self.assertIn(second, digest_panel.items)
else:
self.assertNotIn(new_spec.frequency, container.digest_panels)
self.assert_history(container, new_spec, initial_dt)
@parameterized.expand(
(bar_count, pair, field, data_frequency)
for bar_count in (1, 2)
for pair in product(('1m', '1d'), repeat=2)
for field in HistoryContainer.VALID_FIELDS
for data_frequency in ('minute', 'daily')
if not ('1m' in pair and data_frequency == 'daily')
)
def test_history_add_freq(self, bar_count, pair, field, data_frequency):
first, second = pair
spec = history.HistorySpec(
bar_count=bar_count,
frequency=first,
field=field,
ffill=True,
data_frequency=data_frequency,
)
specs = {spec.key_str: spec}
initial_sids = [1]
initial_dt = pd.Timestamp(
'2013-06-28 13:31AM'
if data_frequency == 'minute'
else '2013-06-28 12:00AM',
tz='UTC',
)
container = HistoryContainer(
specs, initial_sids, initial_dt, data_frequency,
)
if bar_count > 1:
self.assertEqual(
container.digest_panels[spec.frequency].window_length, 1,
)
bar_data = BarData()
container.update(bar_data, initial_dt)
new_spec = history.HistorySpec(
bar_count,
frequency=second,
field=field,
ffill=True,
data_frequency=data_frequency,
)
container.ensure_spec(new_spec, initial_dt)
if bar_count > 1:
digest_panel = container.digest_panels[new_spec.frequency]
self.assertEqual(digest_panel.window_length, bar_count - 1)
else:
self.assertNotIn(new_spec.frequency, container.digest_panels)
self.assert_history(container, new_spec, initial_dt)
@with_environment()
def assert_history(self, container, spec, dt, env=None):
hst = container.get_history(spec, dt)
self.assertEqual(len(hst), spec.bar_count)
back = spec.frequency.prev_bar
for n in reversed(hst.index):
self.assertEqual(dt, n)
dt = back(dt)
+52 -11
View File
@@ -181,6 +181,9 @@ class TradingAlgorithm(object):
self._portfolio = None
self._account = None
self.history_container_class = kwargs.pop(
'history_container_class', HistoryContainer,
)
self.history_container = None
self.history_specs = {}
@@ -435,11 +438,13 @@ class TradingAlgorithm(object):
self.sim_params._update_internal()
# Create history containers
if len(self.history_specs) != 0:
self.history_container = HistoryContainer(
if self.history_specs:
self.history_container = self.history_container_class(
self.history_specs,
self.sim_params.sids,
self.sim_params.first_open)
self.sim_params.first_open,
self.sim_params.data_frequency,
)
# Create transforms by wrapping them into StatefulTransforms
self.transforms = []
@@ -912,21 +917,54 @@ class TradingAlgorithm(object):
self.blotter.cancel(order_id)
@api_method
def add_history(self, bar_count, frequency, field,
ffill=True):
def add_history(self, bar_count, frequency, field, ffill=True):
data_frequency = self.sim_params.data_frequency
daily_at_midnight = (data_frequency == 'daily')
history_spec = HistorySpec(bar_count, frequency, field, ffill,
daily_at_midnight=daily_at_midnight,
data_frequency=data_frequency)
self.history_specs[history_spec.key_str] = history_spec
if self.initialized:
if self.history_container:
self.history_container.ensure_spec(history_spec, self.datetime)
else:
self.history_container = self.history_container_class(
self.trade_sources.history_backfill,
self.history_specs,
self.multiverse.current_sids,
self.sim_params.first_open,
self.sim_params.data_frequency,
)
def get_history_spec(self, bar_count, frequency, field, ffill):
spec_key = HistorySpec.spec_key(bar_count, frequency, field, ffill)
if spec_key not in self.history_specs:
data_freq = self.sim_params.data_frequency
spec = HistorySpec(
bar_count,
frequency,
field,
ffill,
data_frequency=data_freq,
)
self.history_specs[spec_key] = spec
if not self.history_container:
self.history_container = self.history_container_class(
self.history_specs,
self.current_universe(),
self.datetime,
self.sim_params.data_frequency,
shift_digest=True,
)
self.history_container.ensure_spec(spec, self.datetime)
return self.history_specs[spec_key]
@api_method
def history(self, bar_count, frequency, field, ffill=True):
spec_key_str = HistorySpec.spec_key(
bar_count, frequency, field, ffill)
history_spec = self.history_specs[spec_key_str]
history_spec = self.get_history_spec(
bar_count,
frequency,
field,
ffill,
)
return self.history_container.get_history(history_spec, self.datetime)
####################
@@ -994,6 +1032,9 @@ class TradingAlgorithm(object):
"""
self.register_trading_control(LongOnly())
def current_universe(self):
return self.sim_params.sids
@classmethod
def all_api_methods(cls):
"""
+31
View File
@@ -16,6 +16,7 @@
import bisect
import logbook
import datetime
from functools import wraps
import pandas as pd
import numpy as np
@@ -451,3 +452,33 @@ class SimulationParameters(object):
emission_rate=self.emission_rate,
first_open=self.first_open,
last_close=self.last_close)
def with_environment(asname='env'):
"""
Decorator to automagically pass TradingEnvironment to the function
under the name asname. If the environment is passed explicitly as a keyword
then the explicitly passed value will be used instead.
usage:
with_environment()
def f(env=None):
pass
with_environment(asname='my_env')
def g(my_env=None):
pass
"""
def with_environment_decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
# inject env into the namespace for the function.
# This doesn't use setdefault so that grabbing the trading env
# is lazy.
if asname not in kwargs:
kwargs[asname] = TradingEnvironment.instance()
return f(*args, **kwargs)
return wrapper
return with_environment_decorator
+4 -2
View File
@@ -16,7 +16,8 @@
from . history import (
HistorySpec,
days_index_at_dt,
index_at_dt
index_at_dt,
Frequency,
)
from . import history_container
@@ -25,5 +26,6 @@ __all__ = [
'HistorySpec',
'days_index_at_dt',
'index_at_dt',
'history_container'
'history_container',
'Frequency',
]
+64 -43
View File
@@ -20,6 +20,7 @@ import pandas as pd
import re
from zipline.finance import trading
from zipline.finance.trading import with_environment
from zipline.errors import IncompatibleHistoryFrequency
@@ -42,8 +43,9 @@ class Frequency(object):
"""
SUPPORTED_FREQUENCIES = frozenset({'1d', '1m'})
MAX_MINUTES = {'m': 1, 'd': 390}
MAX_DAYS = {'d': 1}
def __init__(self, freq_str, daily_at_midnight=False):
def __init__(self, freq_str, data_frequency):
if freq_str not in self.SUPPORTED_FREQUENCIES:
raise ValueError(
@@ -58,7 +60,7 @@ class Frequency(object):
# unit_str - The unit type, e.g. 'd'
self.num, self.unit_str = parse_freq_str(freq_str)
self.daily_at_midnight = daily_at_midnight
self.data_frequency = data_frequency
def next_window_start(self, previous_window_close):
"""
@@ -67,22 +69,22 @@ class Frequency(object):
"""
if self.unit_str == 'd':
return self.next_day_window_start(previous_window_close,
self.daily_at_midnight)
self.data_frequency)
elif self.unit_str == 'm':
return self.next_minute_window_start(previous_window_close)
@staticmethod
def next_day_window_start(previous_window_close, daily_at_midnight=False):
def next_day_window_start(previous_window_close, data_frequency='minute'):
"""
Get the next day window start after @previous_window_close. This is
defined as the first market open strictly greater than
@previous_window_close.
"""
env = trading.environment
if daily_at_midnight:
if data_frequency == 'daily':
next_open = env.next_trading_day(previous_window_close)
else:
next_open, _ = env.next_open_and_close(previous_window_close)
next_open = env.next_market_minute(previous_window_close)
return next_open
@staticmethod
@@ -128,7 +130,7 @@ class Frequency(object):
offset=-(num_days - 1)
).market_open.iloc[0]
if self.daily_at_midnight:
if self.data_frequency == 'daily':
open_ = pd.tslib.normalize_date(open_)
return open_
@@ -150,44 +152,23 @@ class Frequency(object):
def day_window_close(self, window_start, num_days):
"""
Get the last minute for a daily window of length @num_days with first
minute @window_start. This is calculated by searching forward until
@num_days market closes are encountered.
Get the window close for a daily frequency.
If the data_frequency is minute, then this will be the last minute of
last day of the window.
Examples:
window_start = Thursday March 2nd, 2006, 9:31 AM EST
num_days = 1
--> window_close = Thursday March 2nd, 2006, 4:00 PM EST
window_start = Thursday March 2nd, 2006, 3:59 AM EST
num_days = 1
--> window_close = Thursday March 2nd, 2006, 4:00 PM EST
window_start = Thursday March 2nd, 2006, 9:31 AM EST
num_days = 2
--> window_close = Friday March 2nd, 2006, 4:00 PM EST
window_start = Thursday March 2nd, 2006, 9:31 AM EST
num_days = 3
--> window_close = Monday March 6th, 2006, 4:00 PM EST
# Day before July 4th is an early close
window_start = Wednesday July 3rd, 2013, 9:31 AM EST
num_days = 1
--> window_close = Wednesday July 3rd, 2013, 1:00 PM EST
If the data_frequency is minute, this will be midnight utc of the last
day of the window.
"""
env = trading.environment
close = env.open_close_window(
window_start,
1,
offset=num_days - 1
).market_close.iloc[0]
if self.daily_at_midnight:
close = pd.tslib.normalize_date(close)
if self.data_frequency != 'daily':
return env.get_open_and_close(
env.add_trading_days(num_days - 1, window_start),
)[1]
return close
return pd.tslib.normalize_date(
env.add_trading_days(num_days - 1, window_start),
)
def minute_window_close(self, window_start, num_minutes):
"""
@@ -204,13 +185,53 @@ class Frequency(object):
env = trading.environment
return env.market_minute_window(window_start, count=num_minutes)[-1]
@with_environment()
def prev_bar(self, dt, env=None):
"""
Returns the previous bar for dt.
"""
if self.unit_str == 'd':
if self.data_frequency == 'minute':
func = lambda dt: env.get_open_and_close(
env.previous_trading_day(dt),
)[1]
else:
func = env.previous_trading_day
else:
func = env.previous_market_minute
# Cache the function dispatch.
self.prev_bar = func
return func(dt)
@property
def max_bars(self):
if self.data_frequency == 'daily':
return self.max_days
else:
return self.max_minutes
@property
def max_days(self):
if self.data_frequency != 'daily':
raise ValueError('max_days requested in minute mode')
return self.MAX_DAYS[self.unit_str] * self.num
@property
def max_minutes(self):
"""
The maximum number of minutes required to roll a bar at this frequency.
"""
if self.data_frequency != 'minute':
raise ValueError('max_minutes requested in daily mode')
return self.MAX_MINUTES[self.unit_str] * self.num
def normalize(self, dt):
if self.data_frequency != 'daily':
return dt
return pd.tslib.normalize_date(dt)
def __eq__(self, other):
return self.freq_str == other.freq_str
@@ -242,12 +263,12 @@ class HistorySpec(object):
bar_count, freq_str, field, ffill)
def __init__(self, bar_count, frequency, field, ffill,
daily_at_midnight=False, data_frequency='daily'):
data_frequency='daily'):
# Number of bars to look back.
self.bar_count = bar_count
if isinstance(frequency, str):
frequency = Frequency(frequency, daily_at_midnight)
frequency = Frequency(frequency, data_frequency)
if frequency.unit_str == 'm' and data_frequency == 'daily':
raise IncompatibleHistoryFrequency(
frequency=frequency.unit_str,
@@ -293,7 +314,7 @@ def days_index_at_dt(history_spec, algo_dt):
step=history_spec.frequency.num,
).market_close
if history_spec.frequency.daily_at_midnight:
if history_spec.frequency.data_frequency == 'daily':
market_closes = market_closes.apply(pd.tslib.normalize_date)
# Append the current algo_dt as the last index value.
+381 -70
View File
@@ -12,18 +12,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from bisect import insort_left
from collections import namedtuple
from itertools import groupby, product
import logbook
import numpy as np
import pandas as pd
from six import itervalues, iteritems, iterkeys
from . history import (
index_at_dt,
HistorySpec,
)
from . history import HistorySpec
from zipline.finance.trading import with_environment
from zipline.utils.data import RollingPanel, _ensure_index
logger = logbook.Logger('History Container')
@@ -43,7 +43,6 @@ def ffill_buffer_from_prior_values(freq,
Forward-fill a buffer frame, falling back to the end-of-period values of a
digest frame if the buffer frame has leading NaNs.
"""
nan_sids = buffer_frame.iloc[0].isnull()
if any(nan_sids) and len(digest_frame):
# If we have any leading nans in the buffer and we have a non-empty
@@ -88,33 +87,84 @@ def freq_str_and_bar_count(history_spec):
return (history_spec.frequency.freq_str, history_spec.bar_count)
def group_by_frequency(history_specs):
@with_environment()
def next_bar(spec, env):
"""
Takes an iterable of history specs and returns a dictionary mapping unique
frequencies to a list of specs with that frequency.
Within each list, the HistorySpecs are sorted by ascending bar count.
Example:
[HistorySpec(3, '1d', 'price', True),
HistorySpec(2, '2d', 'open', True),
HistorySpec(2, '1d', 'open', False),
HistorySpec(5, '1m', 'open', True)]
yields
{Frequency('1d') : [HistorySpec(2, '1d', 'open', False)],
HistorySpec(3, '1d', 'price', True),
Frequency('2d') : [HistorySpec(2, '2d', 'open', True)],
Frequency('1m') : [HistorySpec(5, '1m', 'open', True)]}
Returns a function that will return the next bar for a given datetime.
"""
return {key: list(group)
if spec.frequency.unit_str == 'd':
if spec.frequency.data_frequency == 'minute':
return lambda dt: env.get_open_and_close(
env.next_trading_day(dt),
)[1]
else:
return env.next_trading_day
else:
return env.next_market_minute
def compute_largest_specs(history_specs):
"""
Maps a Frequency to the largest HistorySpec at that frequency from an
iterable of HistorySpecs.
"""
return {key: max(group, key=lambda f: f.bar_count)
for key, group in groupby(
sorted(history_specs, key=freq_str_and_bar_count),
key=lambda spec: spec.frequency)}
# tuples to store a change to the shape of a HistoryContainer
FrequencyDelta = namedtuple(
'FrequencyDelta',
['freq', 'buffer_delta'],
)
LengthDelta = namedtuple(
'LengthDelta',
['freq', 'delta'],
)
HistoryContainerDeltaSuper = namedtuple(
'HistoryContainerDelta',
['field', 'frequency_delta', 'length_delta'],
)
class HistoryContainerDelta(HistoryContainerDeltaSuper):
"""
A class representing a resize of the history container.
"""
def __new__(cls, field=None, frequency_delta=None, length_delta=None):
"""
field is a new field that was added.
frequency is a FrequencyDelta representing a new frequency was added.
length is a bar LengthDelta which is a frequency and a bar_count.
If any field is None, then no change occurred of that type.
"""
return super(HistoryContainerDelta, cls).__new__(
cls, field, frequency_delta, length_delta,
)
@property
def empty(self):
"""
Checks if the delta is empty.
"""
return (self.field is None
and self.frequency_delta is None
and self.length_delta is None)
def normalize_to_data_freq(data_frequency, dt):
if data_frequency == 'minute':
return dt
return pd.tslib.normalize_date(dt)
class HistoryContainer(object):
"""
Container for all history panels and frames used by an algoscript.
@@ -124,33 +174,64 @@ class HistoryContainer(object):
Entry point for the algoscript is the result of `get_history`.
"""
VALID_FIELDS = {
'price', 'open_price', 'volume', 'high', 'low', 'close_price',
}
def __init__(self, history_specs, initial_sids, initial_dt):
def __init__(self,
history_specs,
initial_sids,
initial_dt,
data_frequency,
shift_digest=False):
"""
A container to hold a rolling window of historical data within a user's
algorithm.
Args:
history_specs (dict[Frequency:HistorySpec]): The starting history
specs that this container should be able to service.
initial_sids (set[Security or Int]): The starting sids to watch.
initial_dt (datetime): The datetime to start collecting history from.
shift_digest (bool): If True, then the digest panels will be created
shifted back by one bar, this is to facilitate the creation of a
HistoryContainer during a call to handle_data within
TradingAlgorithm. This is False by default.
Returns:
An instance of a new HistoryContainer
"""
# History specs to be served by this container.
self.history_specs = history_specs
self.frequency_groups = \
group_by_frequency(itervalues(self.history_specs))
self.largest_specs = compute_largest_specs(
itervalues(self.history_specs)
)
# The set of fields specified by all history specs
self.fields = pd.Index(
sorted(set(spec.field for spec in itervalues(history_specs)))
)
self.sids = pd.Index(
sorted(set(initial_sids))
sorted(set(initial_sids or []))
)
self.data_frequency = data_frequency
initial_dt = normalize_to_data_freq(self.data_frequency, initial_dt)
# This panel contains raw minutes for periods that haven't been fully
# completed. When a frequency period rolls over, these minutes are
# digested using some sort of aggregation call on the panel (e.g. `sum`
# for volume, `max` for high, `min` for low, etc.).
self.buffer_panel = self.create_buffer_panel(
initial_dt,
)
self.buffer_panel = self.create_buffer_panel(initial_dt)
# Dictionaries with Frequency objects as keys.
self.digest_panels, self.cur_window_starts, self.cur_window_closes = \
self.create_digest_panels(initial_sids, initial_dt)
self.create_digest_panels(initial_sids, initial_dt, shift_digest)
# Helps prop up the prior day panel against having a nan, when the data
# has been seen.
@@ -203,7 +284,232 @@ class HistoryContainer(object):
Return an iterator over all the unique frequencies serviced by this
container.
"""
return iterkeys(self.frequency_groups)
return iterkeys(self.largest_specs)
@with_environment()
def _add_frequency(self, spec, dt, env=None):
"""
Adds a new frequency to the container. This reshapes the buffer_panel
if needed.
"""
freq = spec.frequency
self.largest_specs[freq] = spec
new_buffer_len = 0
if freq.max_bars > self.buffer_panel.window_length:
# More bars need to be held in the buffer_panel to support this
# freq
if freq.data_frequency \
!= self.buffer_spec.frequency.data_frequency:
# If the data_frequencies are not the same, then we need to
# create a fresh buffer.
self.buffer_panel = self.create_buffer_panel(
dt, shift_digest=True,
)
new_buffer_len = None
else:
# The frequencies are the same, we just need to add more bars.
self._resize_panel(
self.buffer_panel,
freq.max_bars,
dt,
self.buffer_spec.frequency,
)
new_buffer_len = freq.max_minutes
# update the current buffer_spec to reflect the new lenght.
self.buffer_spec.bar_count = new_buffer_len + 1
if spec.bar_count > 1:
# This spec has more than one bar, construct a digest panel for it.
self.digest_panels[freq] = self._create_digest_panel(
dt, spec=spec, env=env,
)
else:
self.cur_window_starts[freq] = dt
self.cur_window_closes[freq] = freq.window_close(
self.cur_window_starts[freq]
)
self.last_known_prior_values = self.last_known_prior_values.reindex(
index=self.prior_values_index,
)
return FrequencyDelta(freq, new_buffer_len)
def _add_field(self, field):
"""
Adds a new field to the container.
"""
# self.fields is already sorted, so we just need to insert the new
# field in the correct index.
ls = list(self.fields)
insort_left(ls, field)
self.fields = pd.Index(ls)
self._realign_fields()
self.last_known_prior_values = self.last_known_prior_values.reindex(
index=self.prior_values_index,
)
return field
@with_environment()
def _add_length(self, spec, dt, env=None):
"""
Increases the length of the digest panel for spec.frequency. If this
does not have a panel, and one is needed; a digest panel will be
constructed.
"""
old_count = self.largest_specs[spec.frequency].bar_count
self.largest_specs[spec.frequency] = spec
delta = spec.bar_count - old_count
panel = self.digest_panels.get(spec.frequency)
if panel is None:
# The old length for this frequency was 1 bar, meaning no digest
# panel was held. We must construct a new one here.
panel = self._create_digest_panel(
dt, spec=spec, env=env,
)
else:
self._resize_panel(
panel, spec.bar_count - 1, dt, freq=spec.frequency, env=env,
)
self.digest_panels[spec.frequency] = panel
return LengthDelta(spec.frequency, delta)
@with_environment()
def _resize_panel(self, panel, size, dt, freq, env=None):
"""
Resizes a panel, fills the date_buf with the correct values.
"""
# This is the oldest datetime that will be shown in the current window
# of the panel.
oldest_idx = panel._oldest_frame_idx
oldest_dt = pd.Timestamp(
panel.date_buf[oldest_idx], tz='utc',
)
old_cap = panel.cap
panel.resize(size)
delta = (old_cap - oldest_idx) - panel._oldest_frame_idx
# Backfill the missing dates of the new current window.
missing_dts = self._create_window_date_buf(
delta, freq.unit_str, freq.data_frequency, oldest_dt,
)
# Fill the dates in between the new oldest index and adjusted oldest
# index.
where = slice(panel._oldest_frame_idx, -(old_cap - oldest_idx))
panel.date_buf[where] = missing_dts
@with_environment()
def _create_window_date_buf(self,
window,
unit_str,
data_frequency,
dt,
env=None):
"""
Creates a window length date_buf looking backwards from dt.
"""
if unit_str == 'd':
# Get the properly key'd datetime64 out of the pandas Timestamp
if data_frequency != 'daily':
arr = env.open_close_window(
dt,
window,
offset=-window,
).market_close.astype('datetime64[ns]').values
else:
arr = env.open_close_window(
dt,
window,
offset=-window,
).index.values
return arr
else:
return env.market_minute_window(
env.previous_market_minute(dt),
window,
step=-1,
)[::-1].values
@with_environment()
def _create_panel(self, dt, spec, env=None):
"""
Constructs a rolling panel with a properly aligned date_buf.
"""
dt = normalize_to_data_freq(spec.frequency.data_frequency, dt)
window = spec.bar_count - 1
# everything after dt is going to be filled from calling update, no
# need to precompute these dates.
second = np.empty(window, dtype='datetime64[ns]')
date_buf = np.hstack(
(self._create_window_date_buf(
window,
spec.frequency.unit_str,
spec.frequency.data_frequency,
dt,
env=env,
), second),
)
panel = RollingPanel(
window=window,
items=self.fields,
sids=self.sids,
date_buf=date_buf,
)
return panel
@with_environment()
def _create_digest_panel(self,
dt,
spec,
window_starts=None,
window_closes=None,
env=None):
"""
Creates a digest panel, setting the window_starts and window_closes.
If window_starts or window_closes are None, then self.cur_window_starts
or self.cur_window_closes will be used.
"""
freq = spec.frequency
window_starts = window_starts if window_starts is not None \
else self.cur_window_starts
window_closes = window_closes if window_closes is not None \
else self.cur_window_closes
window_starts[freq] = freq.normalize(dt)
window_closes[freq] = freq.window_close(window_starts[freq])
return self._create_panel(dt, spec, env=env)
def ensure_spec(self, spec, dt):
"""
Ensure that this container has enough space to hold the data for the
given spec. This returns a HistoryContainerDelta to represent the
changes in shape that the container made to support the new
HistorySpec.
"""
updated = {}
if spec.field not in self.fields:
updated['field'] = self._add_field(spec.field)
if spec.frequency not in self.largest_specs:
updated['frequency_delta'] = self._add_frequency(spec, dt)
if spec.bar_count > self.largest_specs[spec.frequency].bar_count:
updated['length_delta'] = self._add_length(spec, dt)
return HistoryContainerDelta(**updated)
def add_sids(self, to_add):
"""
@@ -212,7 +518,7 @@ class HistoryContainer(object):
self.sids = pd.Index(
sorted(self.sids + _ensure_index(to_add)),
)
self._realign()
self._realign_sids()
def drop_sids(self, to_drop):
"""
@@ -221,9 +527,9 @@ class HistoryContainer(object):
self.sids = pd.Index(
sorted(self.sids - _ensure_index(to_drop)),
)
self._realign()
self._realign_sids()
def _realign(self):
def _realign_sids(self):
"""
Realign our constituent panels after adding or removing sids.
"""
@@ -231,17 +537,26 @@ class HistoryContainer(object):
columns=self.sids,
)
for panel in self.all_panels:
panel.set_sids(self.sids)
panel.set_minor_axis(self.sids)
def create_digest_panels(self, initial_sids, initial_dt):
def _realign_fields(self):
self.last_known_prior_values = self.last_known_prior_values.reindex(
index=self.prior_values_index,
)
for panel in self.all_panels:
panel.set_items(self.fields)
@with_environment()
def create_digest_panels(self,
initial_sids,
initial_dt,
shift_digest,
env=None):
"""
Initialize a RollingPanel for each unique panel frequency being stored
by this container. Each RollingPanel pre-allocates enough storage
space to service the highest bar-count of any history call that it
serves.
Relies on the fact that group_by_frequency sorts the value lists by
ascending bar count.
"""
# Map from frequency -> first/last minute of the next digest to be
# rolled for that frequency.
@@ -250,33 +565,27 @@ class HistoryContainer(object):
# Map from frequency -> digest_panels.
panels = {}
for freq, specs in iteritems(self.frequency_groups):
# Relying on the sorting of group_by_frequency to get the spec
# requiring the largest number of bars.
largest_spec = specs[-1]
for freq, largest_spec in iteritems(self.largest_specs):
if largest_spec.bar_count == 1:
# No need to allocate a digest panel; this frequency will only
# ever use data drawn from self.buffer_panel.
first_window_starts[freq] = freq.window_open(initial_dt)
first_window_starts[freq] = freq.normalize(initial_dt)
first_window_closes[freq] = freq.window_close(
first_window_starts[freq]
)
continue
initial_dates = index_at_dt(largest_spec, initial_dt)
dt = initial_dt
if shift_digest:
dt = largest_spec.frequency.prev_bar(dt)
# Set up dates for our first digest roll, which is keyed to the
# close of the first entry in our initial index.
first_window_closes[freq] = initial_dates[0]
first_window_starts[freq] = freq.window_open(initial_dates[0])
rp = RollingPanel(
window=len(initial_dates) - 1,
items=self.fields,
sids=initial_sids,
rp = self._create_digest_panel(
dt,
spec=largest_spec,
window_starts=first_window_starts,
window_closes=first_window_closes,
env=env,
)
panels[freq] = rp
@@ -288,13 +597,18 @@ class HistoryContainer(object):
Initialize a RollingPanel containing enough minutes to service all our
frequencies.
"""
max_bars_needed = max(freq.max_minutes
for freq in self.unique_frequencies)
rp = RollingPanel(
window=max_bars_needed,
items=self.fields,
sids=self.sids,
max_bars_needed = max(
freq.max_bars for freq in self.unique_frequencies
)
freq = '1m' if self.data_frequency == 'minute' else '1d'
spec = HistorySpec(
max_bars_needed + 1, freq, None, None, self.data_frequency,
)
rp = self._create_panel(
initial_dt, spec,
)
self.buffer_spec = spec
return rp
def convert_columns(self, values):
@@ -388,7 +702,6 @@ class HistoryContainer(object):
Takes the bar at @algo_dt's @data, checks to see if we need to roll any
new digests, then adds new data to the buffer panel.
"""
frame = self.frame_from_bardata(data, algo_dt)
self.update_last_known_values()
@@ -490,7 +803,7 @@ class HistoryContainer(object):
Store the non-NaN values from our oldest frame in each frequency.
"""
ffillable = self.ffillable_fields
if len(ffillable) == 0:
if not len(ffillable):
return
for frequency in self.unique_frequencies:
@@ -513,7 +826,6 @@ class HistoryContainer(object):
Selects from the overarching history panel the values for the
@history_spec at the given @algo_dt.
"""
field = history_spec.field
do_ffill = history_spec.ffill
@@ -535,7 +847,6 @@ class HistoryContainer(object):
digest_frame,
self.last_known_prior_values,
)
last_period = self.frame_to_series(field, buffer_frame)
return fast_build_history_output(digest_frame, last_period, algo_dt)
+156 -36
View File
@@ -32,10 +32,151 @@ class RollingPanel(object):
Restrictions: major_axis can only be a DatetimeIndex for now
"""
def __init__(self,
window,
items,
sids,
cap_multiple=2,
dtype=np.float64,
date_buf=None):
self._pos = window
self._window = window
self.items = _ensure_index(items)
self.minor_axis = _ensure_index(sids)
self.cap_multiple = cap_multiple
self.cap = cap_multiple * window
self.dtype = dtype
self.date_buf = np.empty(self.cap, dtype='M8[ns]') \
if date_buf is None else date_buf
self.buffer = self._create_buffer()
@property
def _oldest_frame_idx(self):
return self._pos - self._window
def oldest_frame(self):
"""
Get the oldest frame in the panel.
"""
return self.buffer.iloc[:, self._oldest_frame_idx, :]
def set_minor_axis(self, minor_axis):
self.minor_axis = _ensure_index(minor_axis)
self.buffer = self.buffer.reindex(minor_axis=self.minor_axis)
def set_items(self, items):
self.items = _ensure_index(items)
self.buffer = self.buffer.reindex(items=self.items)
def _create_buffer(self):
panel = pd.Panel(
items=self.items,
minor_axis=self.minor_axis,
major_axis=range(self.cap),
dtype=self.dtype,
)
return panel
def resize(self, window):
"""
Resizes the buffer to hold a new window with a new cap_multiple.
If cap_multiple is None, then the old cap_multiple is used.
"""
self._window = window
pre = self.cap
self.cap = self.cap_multiple * window
delta = self.cap - pre
self._pos += delta
self.date_buf = self.date_buf.copy()
self.date_buf.resize(self.cap)
self.date_buf = np.roll(self.date_buf, delta)
self.buffer = pd.concat(
[
pd.Panel(
items=self.items,
minor_axis=self.minor_axis,
major_axis=np.arange(delta),
dtype=self.dtype,
),
self.buffer
],
axis=1,
)
self.buffer.major_axis = pd.Int64Index(range(self.cap))
def add_frame(self, tick, frame):
"""
"""
if self._pos == self.cap:
self._roll_data()
self.buffer.loc[:, self._pos, :] = frame.T.astype(self.dtype)
self.date_buf[self._pos] = tick
self._pos += 1
def get_current(self):
"""
Get a Panel that is the current data in view. It is not safe to persist
these objects because internal data might change
"""
where = slice(self._oldest_frame_idx, self._pos)
major_axis = pd.DatetimeIndex(deepcopy(self.date_buf[where]), tz='utc')
return pd.Panel(self.buffer.values[:, where, :], self.items,
major_axis, self.minor_axis, dtype=self.dtype)
def set_current(self, panel):
"""
Set the values stored in our current in-view data to be values of the
passed panel. The passed panel must have the same indices as the panel
that would be returned by self.get_current.
"""
where = slice(self._oldest_frame_idx, self._pos)
self.buffer.values[:, where, :] = panel.values
def current_dates(self):
where = slice(self._oldest_frame_idx, self._pos)
return pd.DatetimeIndex(deepcopy(self.date_buf[where]), tz='utc')
def _roll_data(self):
"""
Roll window worth of data up to position zero.
Save the effort of having to expensively roll at each iteration
"""
self.buffer.values[:, :self._window, :] = \
self.buffer.values[:, -self._window:, :]
self.date_buf[:self._window] = self.date_buf[-self._window:]
self._pos = self._window
@property
def window_length(self):
return self._window
class MutableIndexRollingPanel(object):
"""
A version of RollingPanel that exists for backwards compatibility with
batch_transform. This is a copy to allow behavior of RollingPanel to drift
away from this without breaking this class.
This code should be considered frozen, and should not be used in the
future. Instead, see RollingPanel.
"""
def __init__(self, window, items, sids, cap_multiple=2, dtype=np.float64):
self.pos = 0
self.window = window
self._pos = 0
self._window = window
self.items = _ensure_index(items)
self.minor_axis = _ensure_index(sids)
@@ -49,7 +190,7 @@ class RollingPanel(object):
self.buffer = self._create_buffer()
def _oldest_frame_idx(self):
return max(self.pos - self.window, 0)
return max(self._pos - self._window, 0)
def oldest_frame(self):
"""
@@ -70,24 +211,13 @@ class RollingPanel(object):
)
return panel
def add_frame(self, tick, frame):
"""
"""
if self.pos == self.cap:
self._roll_data()
self.buffer.loc[:, self.pos, :] = frame.T.astype(self.dtype)
self.date_buf[self.pos] = tick
self.pos += 1
def get_current(self):
"""
Get a Panel that is the current data in view. It is not safe to persist
these objects because internal data might change
"""
where = slice(self._oldest_frame_idx(), self.pos)
where = slice(self._oldest_frame_idx(), self._pos)
major_axis = pd.DatetimeIndex(deepcopy(self.date_buf[where]), tz='utc')
return pd.Panel(self.buffer.values[:, where, :], self.items,
major_axis, self.minor_axis, dtype=self.dtype)
@@ -98,11 +228,11 @@ class RollingPanel(object):
passed panel. The passed panel must have the same indices as the panel
that would be returned by self.get_current.
"""
where = slice(self._oldest_frame_idx(), self.pos)
where = slice(self._oldest_frame_idx(), self._pos)
self.buffer.values[:, where, :] = panel.values
def current_dates(self):
where = slice(self._oldest_frame_idx(), self.pos)
where = slice(self._oldest_frame_idx(), self._pos)
return pd.DatetimeIndex(deepcopy(self.date_buf[where]), tz='utc')
def _roll_data(self):
@@ -111,42 +241,32 @@ class RollingPanel(object):
Save the effort of having to expensively roll at each iteration
"""
self.buffer.values[:, :self.window, :] = \
self.buffer.values[:, -self.window:, :]
self.date_buf[:self.window] = self.date_buf[-self.window:]
self.pos = self.window
class MutableIndexRollingPanel(RollingPanel):
"""
Subclass of RollingPanel that mutates its indices in response to
newly-added frames. Exists primarily to maintain backward-compatibility
with the semantics of RollingPanel expected by BatchTransform.
This class is likely to be deprecated and/or removed in future versions.
"""
self.buffer.values[:, :self._window, :] = \
self.buffer.values[:, -self._window:, :]
self.date_buf[:self._window] = self.date_buf[-self._window:]
self._pos = self._window
def add_frame(self, tick, frame):
"""
"""
if self.pos == self.cap:
if self._pos == self.cap:
self._roll_data()
if set(frame.columns).difference(set(self.minor_axis)) or \
set(frame.index).difference(set(self.items)):
self._update_buffer(frame)
self.buffer.loc[:, self.pos, :] = frame.T.astype(self.dtype)
self.date_buf[self.pos] = tick
self.buffer.loc[:, self._pos, :] = frame.T.astype(self.dtype)
self.date_buf[self._pos] = tick
self.pos += 1
self._pos += 1
def _update_buffer(self, frame):
# Get current frame as we only need to care about the data that is in
# the active window
old_buffer = self.get_current()
if self.pos >= self.window:
if self._pos >= self._window:
# Don't count the last major_axis entry if we're past our window,
# since it's about to roll off the end of the panel.
old_buffer = old_buffer.iloc[:, 1:, :]