From f8f7f2fc4c06e648ff99ccc21225870bc17fb2f3 Mon Sep 17 00:00:00 2001 From: Joe Jevnik Date: Mon, 6 Oct 2014 13:37:09 -0400 Subject: [PATCH] ENH: Allows history to be dynamic and grow the container at runtime. Previously, all specs had to be pre-allocated by using the 'add_history' function. This is now no longer required and instead serves as a hint to the HistoryContainer to pre-allocate the space for the given spec. History can grow by increasing the length for a frequency, adding a frequency, or adding a field. It can grow with any combination of these. HistoryContainer now is aware of the data_frequency of the algorithm, and no longer uses the daily_at_midnight flag; instead, this is the default behavior. --- tests/history_cases.py | 14 +- tests/test_batchtransform.py | 3 +- tests/test_history.py | 276 +++++++++++++++- zipline/algorithm.py | 63 +++- zipline/finance/trading.py | 31 ++ zipline/history/__init__.py | 6 +- zipline/history/history.py | 107 ++++--- zipline/history/history_container.py | 451 ++++++++++++++++++++++----- zipline/utils/data.py | 192 +++++++++--- 9 files changed, 963 insertions(+), 180 deletions(-) diff --git a/tests/history_cases.py b/tests/history_cases.py index 02570976..d38c7dfa 100644 --- a/tests/history_cases.py +++ b/tests/history_cases.py @@ -534,9 +534,7 @@ HISTORY_CONTAINER_TEST_CASES = { to_utc('2013-06-28 9:31AM'), ], - # Missing volume data should manifest as 0's rather - # than nans. - ).fillna(0 if 'volume' in key else np.nan), + ), pd.DataFrame( data={ 1: [np.nan, 0, 1], @@ -547,7 +545,7 @@ HISTORY_CONTAINER_TEST_CASES = { to_utc('2013-06-28 9:31AM'), to_utc('2013-06-28 9:32AM'), ], - ).fillna(0 if 'volume' in key else np.nan), + ), pd.DataFrame( data={ @@ -560,11 +558,7 @@ HISTORY_CONTAINER_TEST_CASES = { to_utc('2013-06-28 9:33AM'), ], - # Note: Calling fillna() here even though there are - # no NaNs because this makes it less likely - # for us to introduce a stupid bug by - # copy/pasting in the future. - ).fillna(0 if 'volume' in key else np.nan), + ), pd.DataFrame( data={ 1: [2, np.nan, 3], @@ -575,6 +569,8 @@ HISTORY_CONTAINER_TEST_CASES = { to_utc('2013-06-28 9:34AM'), to_utc('2013-06-28 9:35AM'), ], + # For volume, when we are missing data, we replace + # it with 0s to show that no trades occured. ).fillna(0 if 'volume' in key else np.nan), ], ) diff --git a/tests/test_batchtransform.py b/tests/test_batchtransform.py index ece1c2c8..6b89c8e5 100644 --- a/tests/test_batchtransform.py +++ b/tests/test_batchtransform.py @@ -122,8 +122,7 @@ class TestChangeOfSids(TestCase): for sid in self.sids[:i]: self.assertIn(sid, df.columns) - last_elem = len(df) - 1 - self.assertEqual(df[last_elem][last_elem], last_elem) + self.assertEqual(df.iloc[-1].iloc[-1], i) class TestBatchTransformMinutely(TestCase): diff --git a/tests/test_history.py b/tests/test_history.py index 0f9cd9a8..c95c9837 100644 --- a/tests/test_history.py +++ b/tests/test_history.py @@ -14,18 +14,24 @@ # limitations under the License. from unittest import TestCase +from itertools import product +from textwrap import dedent from nose_parameterized import parameterized import numpy as np import pandas as pd from pandas.util.testing import assert_frame_equal -from zipline.history import history +from zipline.history import history, Frequency from zipline.history.history_container import HistoryContainer from zipline.protocol import BarData import zipline.utils.factory as factory from zipline import TradingAlgorithm -from zipline.finance.trading import SimulationParameters, TradingEnvironment +from zipline.finance.trading import ( + SimulationParameters, + TradingEnvironment, + with_environment, +) from zipline.errors import IncompatibleHistoryFrequency from zipline.sources import RandomWalkSource, DataFrameSource @@ -131,7 +137,6 @@ def get_index_at_dt(case_input): case_input['frequency'], None, False, - daily_at_midnight=False, data_frequency='minute', ) return history.index_at_dt(history_spec, case_input['algo_dt']) @@ -197,7 +202,7 @@ class TestHistoryContainer(TestCase): self.assertEqual(len(expected[spec.key_str]), len(updates)) container = HistoryContainer( - {spec.key_str: spec for spec in specs}, sids, dt + {spec.key_str: spec for spec in specs}, sids, dt, 'minute', ) for update_count, update in enumerate(updates): @@ -222,7 +227,7 @@ class TestHistoryContainer(TestCase): frequency='1d', field='price', ffill=True, - daily_at_midnight=False + data_frequency='minute' ) specs = {spec.key_str: spec} initial_sids = [1, ] @@ -230,7 +235,8 @@ class TestHistoryContainer(TestCase): '2013-06-28 9:31AM', tz='US/Eastern').tz_convert('UTC') container = HistoryContainer( - specs, initial_sids, initial_dt) + specs, initial_sids, initial_dt, 'minute' + ) bar_data = BarData() container.update(bar_data, initial_dt) @@ -373,7 +379,7 @@ def handle_data(context, data): end = pd.Timestamp('2006-03-30', tz='UTC') sim_params = factory.create_simulation_parameters( - start=start, end=end) + start=start, end=end, data_frequency='daily') _, df = factory.create_test_df_source(sim_params) df = df.astype(np.float64) @@ -867,3 +873,259 @@ def handle_data(context, data): # Depends on seed np.testing.assert_almost_equal(recorded_ma, 159.76304468946876) + + def test_history_container_constructed_at_runtime(self): + algo_text = dedent( + """\ + from zipline.api import history + def handle_data(context, data): + context.prices = history(2, '1d', 'price') + """ + ) + start = pd.Timestamp('2007-04-05', tz='UTC') + end = pd.Timestamp('2007-04-10', tz='UTC') + + sim_params = SimulationParameters( + period_start=start, + period_end=end, + capital_base=float("1.0e5"), + data_frequency='minute', + emission_rate='daily' + ) + + test_algo = TradingAlgorithm( + script=algo_text, + data_frequency='minute', + sim_params=sim_params + ) + + source = RandomWalkSource(start=start, end=end) + + self.assertIsNone(test_algo.history_container) + test_algo.run(source) + self.assertIsNotNone( + test_algo.history_container, + msg='HistoryContainer was not constructed at runtime', + ) + + container = test_algo.history_container + self.assertEqual( + container.buffer_panel.window_length, + Frequency.MAX_MINUTES['d'], + msg='HistoryContainer.buffer_panel was not large enough to service' + ' the given HistorySpec', + ) + + self.assertEqual( + len(container.digest_panels), + 1, + msg='The HistoryContainer created too many digest panels', + ) + + freq, digest = list(container.digest_panels.items())[0] + self.assertEqual( + freq.unit_str, + 'd', + ) + + self.assertEqual( + digest.window_length, + 1, + msg='The digest panel is not large enough to service the given' + ' HistorySpec', + ) + + +class TestHistoryContainerResize(TestCase): + @parameterized.expand( + (freq, field, data_frequency, construct_digest) + for freq in ('1m', '1d') + for field in HistoryContainer.VALID_FIELDS + for data_frequency in ('minute', 'daily') + for construct_digest in (True, False) + if not (freq == '1m' and data_frequency == 'daily') + ) + def test_history_grow_length(self, + freq, + field, + data_frequency, + construct_digest): + bar_count = 2 if construct_digest else 1 + spec = history.HistorySpec( + bar_count=bar_count, + frequency=freq, + field=field, + ffill=True, + data_frequency=data_frequency, + ) + specs = {spec.key_str: spec} + initial_sids = [1] + initial_dt = pd.Timestamp( + '2013-06-28 13:31AM' + if data_frequency == 'minute' + else '2013-06-28 12:00AM', + tz='UTC', + ) + + container = HistoryContainer( + specs, initial_sids, initial_dt, data_frequency, + ) + + if construct_digest: + self.assertEqual( + container.digest_panels[spec.frequency].window_length, 1, + ) + + bar_data = BarData() + container.update(bar_data, initial_dt) + + to_add = ( + history.HistorySpec( + bar_count=bar_count + 1, + frequency=freq, + field=field, + ffill=True, + data_frequency=data_frequency, + ), + history.HistorySpec( + bar_count=bar_count + 2, + frequency=freq, + field=field, + ffill=True, + data_frequency=data_frequency, + ), + ) + + for spec in to_add: + container.ensure_spec(spec, initial_dt) + + self.assertEqual( + container.digest_panels[spec.frequency].window_length, + spec.bar_count - 1, + ) + + self.assert_history(container, spec, initial_dt) + + @parameterized.expand( + (bar_count, freq, pair, data_frequency) + for bar_count in (1, 2) + for freq in ('1m', '1d') + for pair in product(HistoryContainer.VALID_FIELDS, repeat=2) + for data_frequency in ('minute', 'daily') + if not (freq == '1m' and data_frequency == 'daily') + ) + def test_history_add_field(self, bar_count, freq, pair, data_frequency): + first, second = pair + spec = history.HistorySpec( + bar_count=bar_count, + frequency=freq, + field=first, + ffill=True, + data_frequency=data_frequency, + ) + specs = {spec.key_str: spec} + initial_sids = [1] + initial_dt = pd.Timestamp( + '2013-06-28 13:31AM' + if data_frequency == 'minute' + else '2013-06-28 12:00AM', + tz='UTC', + ) + + container = HistoryContainer( + specs, initial_sids, initial_dt, data_frequency, + ) + + if bar_count > 1: + self.assertEqual( + container.digest_panels[spec.frequency].window_length, 1, + ) + + bar_data = BarData() + container.update(bar_data, initial_dt) + + new_spec = history.HistorySpec( + bar_count, + frequency=freq, + field=second, + ffill=True, + data_frequency=data_frequency, + ) + + container.ensure_spec(new_spec, initial_dt) + + if bar_count > 1: + digest_panel = container.digest_panels[new_spec.frequency] + self.assertEqual(digest_panel.window_length, bar_count - 1) + self.assertIn(second, digest_panel.items) + else: + self.assertNotIn(new_spec.frequency, container.digest_panels) + + self.assert_history(container, new_spec, initial_dt) + + @parameterized.expand( + (bar_count, pair, field, data_frequency) + for bar_count in (1, 2) + for pair in product(('1m', '1d'), repeat=2) + for field in HistoryContainer.VALID_FIELDS + for data_frequency in ('minute', 'daily') + if not ('1m' in pair and data_frequency == 'daily') + ) + def test_history_add_freq(self, bar_count, pair, field, data_frequency): + first, second = pair + spec = history.HistorySpec( + bar_count=bar_count, + frequency=first, + field=field, + ffill=True, + data_frequency=data_frequency, + ) + specs = {spec.key_str: spec} + initial_sids = [1] + initial_dt = pd.Timestamp( + '2013-06-28 13:31AM' + if data_frequency == 'minute' + else '2013-06-28 12:00AM', + tz='UTC', + ) + + container = HistoryContainer( + specs, initial_sids, initial_dt, data_frequency, + ) + + if bar_count > 1: + self.assertEqual( + container.digest_panels[spec.frequency].window_length, 1, + ) + + bar_data = BarData() + container.update(bar_data, initial_dt) + + new_spec = history.HistorySpec( + bar_count, + frequency=second, + field=field, + ffill=True, + data_frequency=data_frequency, + ) + + container.ensure_spec(new_spec, initial_dt) + + if bar_count > 1: + digest_panel = container.digest_panels[new_spec.frequency] + self.assertEqual(digest_panel.window_length, bar_count - 1) + else: + self.assertNotIn(new_spec.frequency, container.digest_panels) + + self.assert_history(container, new_spec, initial_dt) + + @with_environment() + def assert_history(self, container, spec, dt, env=None): + hst = container.get_history(spec, dt) + + self.assertEqual(len(hst), spec.bar_count) + + back = spec.frequency.prev_bar + for n in reversed(hst.index): + self.assertEqual(dt, n) + dt = back(dt) diff --git a/zipline/algorithm.py b/zipline/algorithm.py index 95c74a0a..a9438e02 100644 --- a/zipline/algorithm.py +++ b/zipline/algorithm.py @@ -181,6 +181,9 @@ class TradingAlgorithm(object): self._portfolio = None self._account = None + self.history_container_class = kwargs.pop( + 'history_container_class', HistoryContainer, + ) self.history_container = None self.history_specs = {} @@ -435,11 +438,13 @@ class TradingAlgorithm(object): self.sim_params._update_internal() # Create history containers - if len(self.history_specs) != 0: - self.history_container = HistoryContainer( + if self.history_specs: + self.history_container = self.history_container_class( self.history_specs, self.sim_params.sids, - self.sim_params.first_open) + self.sim_params.first_open, + self.sim_params.data_frequency, + ) # Create transforms by wrapping them into StatefulTransforms self.transforms = [] @@ -912,21 +917,54 @@ class TradingAlgorithm(object): self.blotter.cancel(order_id) @api_method - def add_history(self, bar_count, frequency, field, - ffill=True): + def add_history(self, bar_count, frequency, field, ffill=True): data_frequency = self.sim_params.data_frequency - daily_at_midnight = (data_frequency == 'daily') - history_spec = HistorySpec(bar_count, frequency, field, ffill, - daily_at_midnight=daily_at_midnight, data_frequency=data_frequency) self.history_specs[history_spec.key_str] = history_spec + if self.initialized: + if self.history_container: + self.history_container.ensure_spec(history_spec, self.datetime) + else: + self.history_container = self.history_container_class( + self.trade_sources.history_backfill, + self.history_specs, + self.multiverse.current_sids, + self.sim_params.first_open, + self.sim_params.data_frequency, + ) + + def get_history_spec(self, bar_count, frequency, field, ffill): + spec_key = HistorySpec.spec_key(bar_count, frequency, field, ffill) + if spec_key not in self.history_specs: + data_freq = self.sim_params.data_frequency + spec = HistorySpec( + bar_count, + frequency, + field, + ffill, + data_frequency=data_freq, + ) + self.history_specs[spec_key] = spec + if not self.history_container: + self.history_container = self.history_container_class( + self.history_specs, + self.current_universe(), + self.datetime, + self.sim_params.data_frequency, + shift_digest=True, + ) + self.history_container.ensure_spec(spec, self.datetime) + return self.history_specs[spec_key] @api_method def history(self, bar_count, frequency, field, ffill=True): - spec_key_str = HistorySpec.spec_key( - bar_count, frequency, field, ffill) - history_spec = self.history_specs[spec_key_str] + history_spec = self.get_history_spec( + bar_count, + frequency, + field, + ffill, + ) return self.history_container.get_history(history_spec, self.datetime) #################### @@ -994,6 +1032,9 @@ class TradingAlgorithm(object): """ self.register_trading_control(LongOnly()) + def current_universe(self): + return self.sim_params.sids + @classmethod def all_api_methods(cls): """ diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index fb31d6e6..3c9cae3b 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -16,6 +16,7 @@ import bisect import logbook import datetime +from functools import wraps import pandas as pd import numpy as np @@ -451,3 +452,33 @@ class SimulationParameters(object): emission_rate=self.emission_rate, first_open=self.first_open, last_close=self.last_close) + + +def with_environment(asname='env'): + """ + Decorator to automagically pass TradingEnvironment to the function + under the name asname. If the environment is passed explicitly as a keyword + then the explicitly passed value will be used instead. + + usage: + with_environment() + def f(env=None): + pass + + with_environment(asname='my_env') + def g(my_env=None): + pass + """ + def with_environment_decorator(f): + @wraps(f) + def wrapper(*args, **kwargs): + # inject env into the namespace for the function. + # This doesn't use setdefault so that grabbing the trading env + # is lazy. + if asname not in kwargs: + kwargs[asname] = TradingEnvironment.instance() + return f(*args, **kwargs) + + return wrapper + + return with_environment_decorator diff --git a/zipline/history/__init__.py b/zipline/history/__init__.py index 5db895c9..d9a8a1cc 100644 --- a/zipline/history/__init__.py +++ b/zipline/history/__init__.py @@ -16,7 +16,8 @@ from . history import ( HistorySpec, days_index_at_dt, - index_at_dt + index_at_dt, + Frequency, ) from . import history_container @@ -25,5 +26,6 @@ __all__ = [ 'HistorySpec', 'days_index_at_dt', 'index_at_dt', - 'history_container' + 'history_container', + 'Frequency', ] diff --git a/zipline/history/history.py b/zipline/history/history.py index 027ffe45..b321e0a7 100644 --- a/zipline/history/history.py +++ b/zipline/history/history.py @@ -20,6 +20,7 @@ import pandas as pd import re from zipline.finance import trading +from zipline.finance.trading import with_environment from zipline.errors import IncompatibleHistoryFrequency @@ -42,8 +43,9 @@ class Frequency(object): """ SUPPORTED_FREQUENCIES = frozenset({'1d', '1m'}) MAX_MINUTES = {'m': 1, 'd': 390} + MAX_DAYS = {'d': 1} - def __init__(self, freq_str, daily_at_midnight=False): + def __init__(self, freq_str, data_frequency): if freq_str not in self.SUPPORTED_FREQUENCIES: raise ValueError( @@ -58,7 +60,7 @@ class Frequency(object): # unit_str - The unit type, e.g. 'd' self.num, self.unit_str = parse_freq_str(freq_str) - self.daily_at_midnight = daily_at_midnight + self.data_frequency = data_frequency def next_window_start(self, previous_window_close): """ @@ -67,22 +69,22 @@ class Frequency(object): """ if self.unit_str == 'd': return self.next_day_window_start(previous_window_close, - self.daily_at_midnight) + self.data_frequency) elif self.unit_str == 'm': return self.next_minute_window_start(previous_window_close) @staticmethod - def next_day_window_start(previous_window_close, daily_at_midnight=False): + def next_day_window_start(previous_window_close, data_frequency='minute'): """ Get the next day window start after @previous_window_close. This is defined as the first market open strictly greater than @previous_window_close. """ env = trading.environment - if daily_at_midnight: + if data_frequency == 'daily': next_open = env.next_trading_day(previous_window_close) else: - next_open, _ = env.next_open_and_close(previous_window_close) + next_open = env.next_market_minute(previous_window_close) return next_open @staticmethod @@ -128,7 +130,7 @@ class Frequency(object): offset=-(num_days - 1) ).market_open.iloc[0] - if self.daily_at_midnight: + if self.data_frequency == 'daily': open_ = pd.tslib.normalize_date(open_) return open_ @@ -150,44 +152,23 @@ class Frequency(object): def day_window_close(self, window_start, num_days): """ - Get the last minute for a daily window of length @num_days with first - minute @window_start. This is calculated by searching forward until - @num_days market closes are encountered. + Get the window close for a daily frequency. + If the data_frequency is minute, then this will be the last minute of + last day of the window. - Examples: - - window_start = Thursday March 2nd, 2006, 9:31 AM EST - num_days = 1 - --> window_close = Thursday March 2nd, 2006, 4:00 PM EST - - window_start = Thursday March 2nd, 2006, 3:59 AM EST - num_days = 1 - --> window_close = Thursday March 2nd, 2006, 4:00 PM EST - - window_start = Thursday March 2nd, 2006, 9:31 AM EST - num_days = 2 - --> window_close = Friday March 2nd, 2006, 4:00 PM EST - - window_start = Thursday March 2nd, 2006, 9:31 AM EST - num_days = 3 - --> window_close = Monday March 6th, 2006, 4:00 PM EST - - # Day before July 4th is an early close - window_start = Wednesday July 3rd, 2013, 9:31 AM EST - num_days = 1 - --> window_close = Wednesday July 3rd, 2013, 1:00 PM EST + If the data_frequency is minute, this will be midnight utc of the last + day of the window. """ env = trading.environment - close = env.open_close_window( - window_start, - 1, - offset=num_days - 1 - ).market_close.iloc[0] - if self.daily_at_midnight: - close = pd.tslib.normalize_date(close) + if self.data_frequency != 'daily': + return env.get_open_and_close( + env.add_trading_days(num_days - 1, window_start), + )[1] - return close + return pd.tslib.normalize_date( + env.add_trading_days(num_days - 1, window_start), + ) def minute_window_close(self, window_start, num_minutes): """ @@ -204,13 +185,53 @@ class Frequency(object): env = trading.environment return env.market_minute_window(window_start, count=num_minutes)[-1] + @with_environment() + def prev_bar(self, dt, env=None): + """ + Returns the previous bar for dt. + """ + if self.unit_str == 'd': + if self.data_frequency == 'minute': + func = lambda dt: env.get_open_and_close( + env.previous_trading_day(dt), + )[1] + else: + func = env.previous_trading_day + else: + func = env.previous_market_minute + + # Cache the function dispatch. + self.prev_bar = func + + return func(dt) + + @property + def max_bars(self): + if self.data_frequency == 'daily': + return self.max_days + else: + return self.max_minutes + + @property + def max_days(self): + if self.data_frequency != 'daily': + raise ValueError('max_days requested in minute mode') + return self.MAX_DAYS[self.unit_str] * self.num + @property def max_minutes(self): """ The maximum number of minutes required to roll a bar at this frequency. """ + if self.data_frequency != 'minute': + raise ValueError('max_minutes requested in daily mode') return self.MAX_MINUTES[self.unit_str] * self.num + def normalize(self, dt): + if self.data_frequency != 'daily': + return dt + return pd.tslib.normalize_date(dt) + def __eq__(self, other): return self.freq_str == other.freq_str @@ -242,12 +263,12 @@ class HistorySpec(object): bar_count, freq_str, field, ffill) def __init__(self, bar_count, frequency, field, ffill, - daily_at_midnight=False, data_frequency='daily'): + data_frequency='daily'): # Number of bars to look back. self.bar_count = bar_count if isinstance(frequency, str): - frequency = Frequency(frequency, daily_at_midnight) + frequency = Frequency(frequency, data_frequency) if frequency.unit_str == 'm' and data_frequency == 'daily': raise IncompatibleHistoryFrequency( frequency=frequency.unit_str, @@ -293,7 +314,7 @@ def days_index_at_dt(history_spec, algo_dt): step=history_spec.frequency.num, ).market_close - if history_spec.frequency.daily_at_midnight: + if history_spec.frequency.data_frequency == 'daily': market_closes = market_closes.apply(pd.tslib.normalize_date) # Append the current algo_dt as the last index value. diff --git a/zipline/history/history_container.py b/zipline/history/history_container.py index 02163264..84498669 100644 --- a/zipline/history/history_container.py +++ b/zipline/history/history_container.py @@ -12,18 +12,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from bisect import insort_left +from collections import namedtuple from itertools import groupby, product + import logbook import numpy as np import pandas as pd - from six import itervalues, iteritems, iterkeys -from . history import ( - index_at_dt, - HistorySpec, -) +from . history import HistorySpec +from zipline.finance.trading import with_environment from zipline.utils.data import RollingPanel, _ensure_index logger = logbook.Logger('History Container') @@ -43,7 +43,6 @@ def ffill_buffer_from_prior_values(freq, Forward-fill a buffer frame, falling back to the end-of-period values of a digest frame if the buffer frame has leading NaNs. """ - nan_sids = buffer_frame.iloc[0].isnull() if any(nan_sids) and len(digest_frame): # If we have any leading nans in the buffer and we have a non-empty @@ -88,33 +87,84 @@ def freq_str_and_bar_count(history_spec): return (history_spec.frequency.freq_str, history_spec.bar_count) -def group_by_frequency(history_specs): +@with_environment() +def next_bar(spec, env): """ - Takes an iterable of history specs and returns a dictionary mapping unique - frequencies to a list of specs with that frequency. - - Within each list, the HistorySpecs are sorted by ascending bar count. - - Example: - - [HistorySpec(3, '1d', 'price', True), - HistorySpec(2, '2d', 'open', True), - HistorySpec(2, '1d', 'open', False), - HistorySpec(5, '1m', 'open', True)] - - yields - - {Frequency('1d') : [HistorySpec(2, '1d', 'open', False)], - HistorySpec(3, '1d', 'price', True), - Frequency('2d') : [HistorySpec(2, '2d', 'open', True)], - Frequency('1m') : [HistorySpec(5, '1m', 'open', True)]} + Returns a function that will return the next bar for a given datetime. """ - return {key: list(group) + if spec.frequency.unit_str == 'd': + if spec.frequency.data_frequency == 'minute': + return lambda dt: env.get_open_and_close( + env.next_trading_day(dt), + )[1] + else: + return env.next_trading_day + else: + return env.next_market_minute + + +def compute_largest_specs(history_specs): + """ + Maps a Frequency to the largest HistorySpec at that frequency from an + iterable of HistorySpecs. + """ + return {key: max(group, key=lambda f: f.bar_count) for key, group in groupby( sorted(history_specs, key=freq_str_and_bar_count), key=lambda spec: spec.frequency)} +# tuples to store a change to the shape of a HistoryContainer + +FrequencyDelta = namedtuple( + 'FrequencyDelta', + ['freq', 'buffer_delta'], +) + + +LengthDelta = namedtuple( + 'LengthDelta', + ['freq', 'delta'], +) + + +HistoryContainerDeltaSuper = namedtuple( + 'HistoryContainerDelta', + ['field', 'frequency_delta', 'length_delta'], +) + + +class HistoryContainerDelta(HistoryContainerDeltaSuper): + """ + A class representing a resize of the history container. + """ + def __new__(cls, field=None, frequency_delta=None, length_delta=None): + """ + field is a new field that was added. + frequency is a FrequencyDelta representing a new frequency was added. + length is a bar LengthDelta which is a frequency and a bar_count. + If any field is None, then no change occurred of that type. + """ + return super(HistoryContainerDelta, cls).__new__( + cls, field, frequency_delta, length_delta, + ) + + @property + def empty(self): + """ + Checks if the delta is empty. + """ + return (self.field is None + and self.frequency_delta is None + and self.length_delta is None) + + +def normalize_to_data_freq(data_frequency, dt): + if data_frequency == 'minute': + return dt + return pd.tslib.normalize_date(dt) + + class HistoryContainer(object): """ Container for all history panels and frames used by an algoscript. @@ -124,33 +174,64 @@ class HistoryContainer(object): Entry point for the algoscript is the result of `get_history`. """ + VALID_FIELDS = { + 'price', 'open_price', 'volume', 'high', 'low', 'close_price', + } - def __init__(self, history_specs, initial_sids, initial_dt): + def __init__(self, + history_specs, + initial_sids, + initial_dt, + data_frequency, + shift_digest=False): + """ + A container to hold a rolling window of historical data within a user's + algorithm. + + Args: + history_specs (dict[Frequency:HistorySpec]): The starting history + specs that this container should be able to service. + + initial_sids (set[Security or Int]): The starting sids to watch. + + initial_dt (datetime): The datetime to start collecting history from. + + shift_digest (bool): If True, then the digest panels will be created + shifted back by one bar, this is to facilitate the creation of a + HistoryContainer during a call to handle_data within + TradingAlgorithm. This is False by default. + + Returns: + An instance of a new HistoryContainer + """ # History specs to be served by this container. self.history_specs = history_specs - self.frequency_groups = \ - group_by_frequency(itervalues(self.history_specs)) + self.largest_specs = compute_largest_specs( + itervalues(self.history_specs) + ) # The set of fields specified by all history specs self.fields = pd.Index( sorted(set(spec.field for spec in itervalues(history_specs))) ) self.sids = pd.Index( - sorted(set(initial_sids)) + sorted(set(initial_sids or [])) ) + self.data_frequency = data_frequency + + initial_dt = normalize_to_data_freq(self.data_frequency, initial_dt) + # This panel contains raw minutes for periods that haven't been fully # completed. When a frequency period rolls over, these minutes are # digested using some sort of aggregation call on the panel (e.g. `sum` # for volume, `max` for high, `min` for low, etc.). - self.buffer_panel = self.create_buffer_panel( - initial_dt, - ) + self.buffer_panel = self.create_buffer_panel(initial_dt) # Dictionaries with Frequency objects as keys. self.digest_panels, self.cur_window_starts, self.cur_window_closes = \ - self.create_digest_panels(initial_sids, initial_dt) + self.create_digest_panels(initial_sids, initial_dt, shift_digest) # Helps prop up the prior day panel against having a nan, when the data # has been seen. @@ -203,7 +284,232 @@ class HistoryContainer(object): Return an iterator over all the unique frequencies serviced by this container. """ - return iterkeys(self.frequency_groups) + return iterkeys(self.largest_specs) + + @with_environment() + def _add_frequency(self, spec, dt, env=None): + """ + Adds a new frequency to the container. This reshapes the buffer_panel + if needed. + """ + freq = spec.frequency + self.largest_specs[freq] = spec + new_buffer_len = 0 + + if freq.max_bars > self.buffer_panel.window_length: + # More bars need to be held in the buffer_panel to support this + # freq + if freq.data_frequency \ + != self.buffer_spec.frequency.data_frequency: + # If the data_frequencies are not the same, then we need to + # create a fresh buffer. + self.buffer_panel = self.create_buffer_panel( + dt, shift_digest=True, + ) + new_buffer_len = None + else: + # The frequencies are the same, we just need to add more bars. + self._resize_panel( + self.buffer_panel, + freq.max_bars, + dt, + self.buffer_spec.frequency, + ) + new_buffer_len = freq.max_minutes + # update the current buffer_spec to reflect the new lenght. + self.buffer_spec.bar_count = new_buffer_len + 1 + + if spec.bar_count > 1: + # This spec has more than one bar, construct a digest panel for it. + self.digest_panels[freq] = self._create_digest_panel( + dt, spec=spec, env=env, + ) + else: + self.cur_window_starts[freq] = dt + self.cur_window_closes[freq] = freq.window_close( + self.cur_window_starts[freq] + ) + + self.last_known_prior_values = self.last_known_prior_values.reindex( + index=self.prior_values_index, + ) + + return FrequencyDelta(freq, new_buffer_len) + + def _add_field(self, field): + """ + Adds a new field to the container. + """ + # self.fields is already sorted, so we just need to insert the new + # field in the correct index. + ls = list(self.fields) + insort_left(ls, field) + self.fields = pd.Index(ls) + + self._realign_fields() + self.last_known_prior_values = self.last_known_prior_values.reindex( + index=self.prior_values_index, + ) + return field + + @with_environment() + def _add_length(self, spec, dt, env=None): + """ + Increases the length of the digest panel for spec.frequency. If this + does not have a panel, and one is needed; a digest panel will be + constructed. + """ + old_count = self.largest_specs[spec.frequency].bar_count + self.largest_specs[spec.frequency] = spec + delta = spec.bar_count - old_count + + panel = self.digest_panels.get(spec.frequency) + + if panel is None: + # The old length for this frequency was 1 bar, meaning no digest + # panel was held. We must construct a new one here. + panel = self._create_digest_panel( + dt, spec=spec, env=env, + ) + + else: + self._resize_panel( + panel, spec.bar_count - 1, dt, freq=spec.frequency, env=env, + ) + + self.digest_panels[spec.frequency] = panel + + return LengthDelta(spec.frequency, delta) + + @with_environment() + def _resize_panel(self, panel, size, dt, freq, env=None): + """ + Resizes a panel, fills the date_buf with the correct values. + """ + # This is the oldest datetime that will be shown in the current window + # of the panel. + oldest_idx = panel._oldest_frame_idx + oldest_dt = pd.Timestamp( + panel.date_buf[oldest_idx], tz='utc', + ) + old_cap = panel.cap + panel.resize(size) + + delta = (old_cap - oldest_idx) - panel._oldest_frame_idx + + # Backfill the missing dates of the new current window. + missing_dts = self._create_window_date_buf( + delta, freq.unit_str, freq.data_frequency, oldest_dt, + ) + + # Fill the dates in between the new oldest index and adjusted oldest + # index. + where = slice(panel._oldest_frame_idx, -(old_cap - oldest_idx)) + panel.date_buf[where] = missing_dts + + @with_environment() + def _create_window_date_buf(self, + window, + unit_str, + data_frequency, + dt, + env=None): + """ + Creates a window length date_buf looking backwards from dt. + """ + if unit_str == 'd': + # Get the properly key'd datetime64 out of the pandas Timestamp + if data_frequency != 'daily': + arr = env.open_close_window( + dt, + window, + offset=-window, + ).market_close.astype('datetime64[ns]').values + else: + arr = env.open_close_window( + dt, + window, + offset=-window, + ).index.values + + return arr + else: + return env.market_minute_window( + env.previous_market_minute(dt), + window, + step=-1, + )[::-1].values + + @with_environment() + def _create_panel(self, dt, spec, env=None): + """ + Constructs a rolling panel with a properly aligned date_buf. + """ + dt = normalize_to_data_freq(spec.frequency.data_frequency, dt) + + window = spec.bar_count - 1 + + # everything after dt is going to be filled from calling update, no + # need to precompute these dates. + second = np.empty(window, dtype='datetime64[ns]') + date_buf = np.hstack( + (self._create_window_date_buf( + window, + spec.frequency.unit_str, + spec.frequency.data_frequency, + dt, + env=env, + ), second), + ) + + panel = RollingPanel( + window=window, + items=self.fields, + sids=self.sids, + date_buf=date_buf, + ) + + return panel + + @with_environment() + def _create_digest_panel(self, + dt, + spec, + window_starts=None, + window_closes=None, + env=None): + """ + Creates a digest panel, setting the window_starts and window_closes. + If window_starts or window_closes are None, then self.cur_window_starts + or self.cur_window_closes will be used. + """ + freq = spec.frequency + + window_starts = window_starts if window_starts is not None \ + else self.cur_window_starts + window_closes = window_closes if window_closes is not None \ + else self.cur_window_closes + + window_starts[freq] = freq.normalize(dt) + window_closes[freq] = freq.window_close(window_starts[freq]) + + return self._create_panel(dt, spec, env=env) + + def ensure_spec(self, spec, dt): + """ + Ensure that this container has enough space to hold the data for the + given spec. This returns a HistoryContainerDelta to represent the + changes in shape that the container made to support the new + HistorySpec. + """ + updated = {} + if spec.field not in self.fields: + updated['field'] = self._add_field(spec.field) + if spec.frequency not in self.largest_specs: + updated['frequency_delta'] = self._add_frequency(spec, dt) + if spec.bar_count > self.largest_specs[spec.frequency].bar_count: + updated['length_delta'] = self._add_length(spec, dt) + return HistoryContainerDelta(**updated) def add_sids(self, to_add): """ @@ -212,7 +518,7 @@ class HistoryContainer(object): self.sids = pd.Index( sorted(self.sids + _ensure_index(to_add)), ) - self._realign() + self._realign_sids() def drop_sids(self, to_drop): """ @@ -221,9 +527,9 @@ class HistoryContainer(object): self.sids = pd.Index( sorted(self.sids - _ensure_index(to_drop)), ) - self._realign() + self._realign_sids() - def _realign(self): + def _realign_sids(self): """ Realign our constituent panels after adding or removing sids. """ @@ -231,17 +537,26 @@ class HistoryContainer(object): columns=self.sids, ) for panel in self.all_panels: - panel.set_sids(self.sids) + panel.set_minor_axis(self.sids) - def create_digest_panels(self, initial_sids, initial_dt): + def _realign_fields(self): + self.last_known_prior_values = self.last_known_prior_values.reindex( + index=self.prior_values_index, + ) + for panel in self.all_panels: + panel.set_items(self.fields) + + @with_environment() + def create_digest_panels(self, + initial_sids, + initial_dt, + shift_digest, + env=None): """ Initialize a RollingPanel for each unique panel frequency being stored by this container. Each RollingPanel pre-allocates enough storage space to service the highest bar-count of any history call that it serves. - - Relies on the fact that group_by_frequency sorts the value lists by - ascending bar count. """ # Map from frequency -> first/last minute of the next digest to be # rolled for that frequency. @@ -250,33 +565,27 @@ class HistoryContainer(object): # Map from frequency -> digest_panels. panels = {} - for freq, specs in iteritems(self.frequency_groups): - - # Relying on the sorting of group_by_frequency to get the spec - # requiring the largest number of bars. - largest_spec = specs[-1] + for freq, largest_spec in iteritems(self.largest_specs): if largest_spec.bar_count == 1: - # No need to allocate a digest panel; this frequency will only # ever use data drawn from self.buffer_panel. - first_window_starts[freq] = freq.window_open(initial_dt) + first_window_starts[freq] = freq.normalize(initial_dt) first_window_closes[freq] = freq.window_close( first_window_starts[freq] ) continue - initial_dates = index_at_dt(largest_spec, initial_dt) + dt = initial_dt + if shift_digest: + dt = largest_spec.frequency.prev_bar(dt) - # Set up dates for our first digest roll, which is keyed to the - # close of the first entry in our initial index. - first_window_closes[freq] = initial_dates[0] - first_window_starts[freq] = freq.window_open(initial_dates[0]) - - rp = RollingPanel( - window=len(initial_dates) - 1, - items=self.fields, - sids=initial_sids, + rp = self._create_digest_panel( + dt, + spec=largest_spec, + window_starts=first_window_starts, + window_closes=first_window_closes, + env=env, ) panels[freq] = rp @@ -288,13 +597,18 @@ class HistoryContainer(object): Initialize a RollingPanel containing enough minutes to service all our frequencies. """ - max_bars_needed = max(freq.max_minutes - for freq in self.unique_frequencies) - rp = RollingPanel( - window=max_bars_needed, - items=self.fields, - sids=self.sids, + max_bars_needed = max( + freq.max_bars for freq in self.unique_frequencies ) + freq = '1m' if self.data_frequency == 'minute' else '1d' + spec = HistorySpec( + max_bars_needed + 1, freq, None, None, self.data_frequency, + ) + + rp = self._create_panel( + initial_dt, spec, + ) + self.buffer_spec = spec return rp def convert_columns(self, values): @@ -388,7 +702,6 @@ class HistoryContainer(object): Takes the bar at @algo_dt's @data, checks to see if we need to roll any new digests, then adds new data to the buffer panel. """ - frame = self.frame_from_bardata(data, algo_dt) self.update_last_known_values() @@ -490,7 +803,7 @@ class HistoryContainer(object): Store the non-NaN values from our oldest frame in each frequency. """ ffillable = self.ffillable_fields - if len(ffillable) == 0: + if not len(ffillable): return for frequency in self.unique_frequencies: @@ -513,7 +826,6 @@ class HistoryContainer(object): Selects from the overarching history panel the values for the @history_spec at the given @algo_dt. """ - field = history_spec.field do_ffill = history_spec.ffill @@ -535,7 +847,6 @@ class HistoryContainer(object): digest_frame, self.last_known_prior_values, ) - last_period = self.frame_to_series(field, buffer_frame) return fast_build_history_output(digest_frame, last_period, algo_dt) diff --git a/zipline/utils/data.py b/zipline/utils/data.py index caf9167e..ef5f80f8 100644 --- a/zipline/utils/data.py +++ b/zipline/utils/data.py @@ -32,10 +32,151 @@ class RollingPanel(object): Restrictions: major_axis can only be a DatetimeIndex for now """ + def __init__(self, + window, + items, + sids, + cap_multiple=2, + dtype=np.float64, + date_buf=None): + + self._pos = window + self._window = window + + self.items = _ensure_index(items) + self.minor_axis = _ensure_index(sids) + + self.cap_multiple = cap_multiple + self.cap = cap_multiple * window + + self.dtype = dtype + self.date_buf = np.empty(self.cap, dtype='M8[ns]') \ + if date_buf is None else date_buf + + self.buffer = self._create_buffer() + + @property + def _oldest_frame_idx(self): + return self._pos - self._window + + def oldest_frame(self): + """ + Get the oldest frame in the panel. + """ + return self.buffer.iloc[:, self._oldest_frame_idx, :] + + def set_minor_axis(self, minor_axis): + self.minor_axis = _ensure_index(minor_axis) + self.buffer = self.buffer.reindex(minor_axis=self.minor_axis) + + def set_items(self, items): + self.items = _ensure_index(items) + self.buffer = self.buffer.reindex(items=self.items) + + def _create_buffer(self): + panel = pd.Panel( + items=self.items, + minor_axis=self.minor_axis, + major_axis=range(self.cap), + dtype=self.dtype, + ) + return panel + + def resize(self, window): + """ + Resizes the buffer to hold a new window with a new cap_multiple. + If cap_multiple is None, then the old cap_multiple is used. + """ + self._window = window + + pre = self.cap + self.cap = self.cap_multiple * window + delta = self.cap - pre + + self._pos += delta + + self.date_buf = self.date_buf.copy() + self.date_buf.resize(self.cap) + self.date_buf = np.roll(self.date_buf, delta) + + self.buffer = pd.concat( + [ + pd.Panel( + items=self.items, + minor_axis=self.minor_axis, + major_axis=np.arange(delta), + dtype=self.dtype, + ), + self.buffer + ], + axis=1, + ) + self.buffer.major_axis = pd.Int64Index(range(self.cap)) + + def add_frame(self, tick, frame): + """ + """ + if self._pos == self.cap: + self._roll_data() + + self.buffer.loc[:, self._pos, :] = frame.T.astype(self.dtype) + self.date_buf[self._pos] = tick + + self._pos += 1 + + def get_current(self): + """ + Get a Panel that is the current data in view. It is not safe to persist + these objects because internal data might change + """ + + where = slice(self._oldest_frame_idx, self._pos) + major_axis = pd.DatetimeIndex(deepcopy(self.date_buf[where]), tz='utc') + return pd.Panel(self.buffer.values[:, where, :], self.items, + major_axis, self.minor_axis, dtype=self.dtype) + + def set_current(self, panel): + """ + Set the values stored in our current in-view data to be values of the + passed panel. The passed panel must have the same indices as the panel + that would be returned by self.get_current. + """ + where = slice(self._oldest_frame_idx, self._pos) + self.buffer.values[:, where, :] = panel.values + + def current_dates(self): + where = slice(self._oldest_frame_idx, self._pos) + return pd.DatetimeIndex(deepcopy(self.date_buf[where]), tz='utc') + + def _roll_data(self): + """ + Roll window worth of data up to position zero. + Save the effort of having to expensively roll at each iteration + """ + + self.buffer.values[:, :self._window, :] = \ + self.buffer.values[:, -self._window:, :] + self.date_buf[:self._window] = self.date_buf[-self._window:] + self._pos = self._window + + @property + def window_length(self): + return self._window + + +class MutableIndexRollingPanel(object): + """ + A version of RollingPanel that exists for backwards compatibility with + batch_transform. This is a copy to allow behavior of RollingPanel to drift + away from this without breaking this class. + + This code should be considered frozen, and should not be used in the + future. Instead, see RollingPanel. + """ def __init__(self, window, items, sids, cap_multiple=2, dtype=np.float64): - self.pos = 0 - self.window = window + self._pos = 0 + self._window = window self.items = _ensure_index(items) self.minor_axis = _ensure_index(sids) @@ -49,7 +190,7 @@ class RollingPanel(object): self.buffer = self._create_buffer() def _oldest_frame_idx(self): - return max(self.pos - self.window, 0) + return max(self._pos - self._window, 0) def oldest_frame(self): """ @@ -70,24 +211,13 @@ class RollingPanel(object): ) return panel - def add_frame(self, tick, frame): - """ - """ - if self.pos == self.cap: - self._roll_data() - - self.buffer.loc[:, self.pos, :] = frame.T.astype(self.dtype) - self.date_buf[self.pos] = tick - - self.pos += 1 - def get_current(self): """ Get a Panel that is the current data in view. It is not safe to persist these objects because internal data might change """ - where = slice(self._oldest_frame_idx(), self.pos) + where = slice(self._oldest_frame_idx(), self._pos) major_axis = pd.DatetimeIndex(deepcopy(self.date_buf[where]), tz='utc') return pd.Panel(self.buffer.values[:, where, :], self.items, major_axis, self.minor_axis, dtype=self.dtype) @@ -98,11 +228,11 @@ class RollingPanel(object): passed panel. The passed panel must have the same indices as the panel that would be returned by self.get_current. """ - where = slice(self._oldest_frame_idx(), self.pos) + where = slice(self._oldest_frame_idx(), self._pos) self.buffer.values[:, where, :] = panel.values def current_dates(self): - where = slice(self._oldest_frame_idx(), self.pos) + where = slice(self._oldest_frame_idx(), self._pos) return pd.DatetimeIndex(deepcopy(self.date_buf[where]), tz='utc') def _roll_data(self): @@ -111,42 +241,32 @@ class RollingPanel(object): Save the effort of having to expensively roll at each iteration """ - self.buffer.values[:, :self.window, :] = \ - self.buffer.values[:, -self.window:, :] - self.date_buf[:self.window] = self.date_buf[-self.window:] - self.pos = self.window - - -class MutableIndexRollingPanel(RollingPanel): - """ - Subclass of RollingPanel that mutates its indices in response to - newly-added frames. Exists primarily to maintain backward-compatibility - with the semantics of RollingPanel expected by BatchTransform. - - This class is likely to be deprecated and/or removed in future versions. - """ + self.buffer.values[:, :self._window, :] = \ + self.buffer.values[:, -self._window:, :] + self.date_buf[:self._window] = self.date_buf[-self._window:] + self._pos = self._window def add_frame(self, tick, frame): """ """ - if self.pos == self.cap: + if self._pos == self.cap: self._roll_data() if set(frame.columns).difference(set(self.minor_axis)) or \ set(frame.index).difference(set(self.items)): self._update_buffer(frame) - self.buffer.loc[:, self.pos, :] = frame.T.astype(self.dtype) - self.date_buf[self.pos] = tick + self.buffer.loc[:, self._pos, :] = frame.T.astype(self.dtype) + self.date_buf[self._pos] = tick - self.pos += 1 + self._pos += 1 def _update_buffer(self, frame): # Get current frame as we only need to care about the data that is in # the active window old_buffer = self.get_current() - if self.pos >= self.window: + if self._pos >= self._window: # Don't count the last major_axis entry if we're past our window, # since it's about to roll off the end of the panel. old_buffer = old_buffer.iloc[:, 1:, :]