From b6e5345893440dcff1a40f9f4d6dbe9b6632e160 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Fri, 30 May 2014 18:24:24 -0400 Subject: [PATCH 1/6] ENH: Enhancements to `TradingEnvironment`. Adds a suite of new functions for querying data from the trading calendar. These include: `previous_trading_day` `minutes_for_days_in_range` (minutely version of `days_in_range`) `previous_open_and_close` (inverse of `next_open_and_close`) `next_market_minute` `previous_market_minute` `open_close_window` (get a range of opens/closes with slicing semantics) `market_minute_window` (get a range of minutes with slicing semantics) Also refactors `test_finance` to move `TradingEnvironment` tests into their own TestCase. --- tests/test_finance.py | 263 +++++++++++++++++++++++++------------ zipline/finance/trading.py | 122 ++++++++++++++++- 2 files changed, 301 insertions(+), 84 deletions(-) diff --git a/tests/test_finance.py b/tests/test_finance.py index 7eec21b8..a9fa724d 100644 --- a/tests/test_finance.py +++ b/tests/test_finance.py @@ -40,6 +40,7 @@ from zipline.finance.blotter import Blotter from zipline.gens.composites import date_sorted_sources from zipline.finance import trading +from zipline.finance.trading import TradingEnvironment from zipline.finance.execution import MarketOrder, LimitOrder from zipline.finance.trading import SimulationParameters @@ -80,88 +81,6 @@ class FinanceTestCase(TestCase): self.assertTrue(trade.dt > prev.dt) prev = trade - @timed(DEFAULT_TIMEOUT) - def test_trading_environment(self): - # holidays taken from: http://www.nyse.com/press/1191407641943.html - new_years = datetime(2008, 1, 1, tzinfo=pytz.utc) - mlk_day = datetime(2008, 1, 21, tzinfo=pytz.utc) - presidents = datetime(2008, 2, 18, tzinfo=pytz.utc) - good_friday = datetime(2008, 3, 21, tzinfo=pytz.utc) - memorial_day = datetime(2008, 5, 26, tzinfo=pytz.utc) - july_4th = datetime(2008, 7, 4, tzinfo=pytz.utc) - labor_day = datetime(2008, 9, 1, tzinfo=pytz.utc) - tgiving = datetime(2008, 11, 27, tzinfo=pytz.utc) - christmas = datetime(2008, 5, 25, tzinfo=pytz.utc) - a_saturday = datetime(2008, 8, 2, tzinfo=pytz.utc) - a_sunday = datetime(2008, 10, 12, tzinfo=pytz.utc) - holidays = [ - new_years, - mlk_day, - presidents, - good_friday, - memorial_day, - july_4th, - labor_day, - tgiving, - christmas, - a_saturday, - a_sunday - ] - - for holiday in holidays: - self.assertTrue(not trading.environment.is_trading_day(holiday)) - - first_trading_day = datetime(2008, 1, 2, tzinfo=pytz.utc) - last_trading_day = datetime(2008, 12, 31, tzinfo=pytz.utc) - workdays = [first_trading_day, last_trading_day] - - for workday in workdays: - self.assertTrue(trading.environment.is_trading_day(workday)) - - def test_simulation_parameters(self): - env = SimulationParameters( - period_start=datetime(2008, 1, 1, tzinfo=pytz.utc), - period_end=datetime(2008, 12, 31, tzinfo=pytz.utc), - capital_base=100000, - ) - - self.assertTrue(env.last_close.month == 12) - self.assertTrue(env.last_close.day == 31) - - @timed(DEFAULT_TIMEOUT) - def test_sim_params_days_in_period(self): - - # January 2008 - # Su Mo Tu We Th Fr Sa - # 1 2 3 4 5 - # 6 7 8 9 10 11 12 - # 13 14 15 16 17 18 19 - # 20 21 22 23 24 25 26 - # 27 28 29 30 31 - - env = SimulationParameters( - period_start=datetime(2007, 12, 31, tzinfo=pytz.utc), - period_end=datetime(2008, 1, 7, tzinfo=pytz.utc), - capital_base=100000, - ) - - expected_trading_days = ( - datetime(2007, 12, 31, tzinfo=pytz.utc), - # Skip new years - # holidays taken from: http://www.nyse.com/press/1191407641943.html - datetime(2008, 1, 2, tzinfo=pytz.utc), - datetime(2008, 1, 3, tzinfo=pytz.utc), - datetime(2008, 1, 4, tzinfo=pytz.utc), - # Skip Saturday - # Skip Sunday - datetime(2008, 1, 7, tzinfo=pytz.utc) - ) - - num_expected_trading_days = 5 - self.assertEquals(num_expected_trading_days, env.days_in_period) - np.testing.assert_array_equal(expected_trading_days, - env.trading_days.tolist()) - @timed(EXTENDED_TIMEOUT) def test_full_zipline(self): # provide enough trades to ensure all orders are filled. @@ -429,3 +348,183 @@ class FinanceTestCase(TestCase): self.assertEqual(300, fls_order['amount']) self.assertEqual(3.33, fls_order['limit']) self.assertEqual(2, fls_order['sid']) + + +class TradingEnvironmentTestCase(TestCase): + """ + Tests for date management utilities in zipline.finance.trading. + """ + + def setUp(self): + setup_logger(self) + + def tearDown(self): + teardown_logger(self) + + @classmethod + def setUpClass(cls): + cls.env = TradingEnvironment() + + @timed(DEFAULT_TIMEOUT) + def test_is_trading_day(self): + # holidays taken from: http://www.nyse.com/press/1191407641943.html + new_years = datetime(2008, 1, 1, tzinfo=pytz.utc) + mlk_day = datetime(2008, 1, 21, tzinfo=pytz.utc) + presidents = datetime(2008, 2, 18, tzinfo=pytz.utc) + good_friday = datetime(2008, 3, 21, tzinfo=pytz.utc) + memorial_day = datetime(2008, 5, 26, tzinfo=pytz.utc) + july_4th = datetime(2008, 7, 4, tzinfo=pytz.utc) + labor_day = datetime(2008, 9, 1, tzinfo=pytz.utc) + tgiving = datetime(2008, 11, 27, tzinfo=pytz.utc) + christmas = datetime(2008, 5, 25, tzinfo=pytz.utc) + a_saturday = datetime(2008, 8, 2, tzinfo=pytz.utc) + a_sunday = datetime(2008, 10, 12, tzinfo=pytz.utc) + holidays = [ + new_years, + mlk_day, + presidents, + good_friday, + memorial_day, + july_4th, + labor_day, + tgiving, + christmas, + a_saturday, + a_sunday + ] + + for holiday in holidays: + self.assertTrue(not self.env.is_trading_day(holiday)) + + first_trading_day = datetime(2008, 1, 2, tzinfo=pytz.utc) + last_trading_day = datetime(2008, 12, 31, tzinfo=pytz.utc) + workdays = [first_trading_day, last_trading_day] + + for workday in workdays: + self.assertTrue(self.env.is_trading_day(workday)) + + def test_simulation_parameters(self): + env = SimulationParameters( + period_start=datetime(2008, 1, 1, tzinfo=pytz.utc), + period_end=datetime(2008, 12, 31, tzinfo=pytz.utc), + capital_base=100000, + ) + + self.assertTrue(env.last_close.month == 12) + self.assertTrue(env.last_close.day == 31) + + @timed(DEFAULT_TIMEOUT) + def test_sim_params_days_in_period(self): + + # January 2008 + # Su Mo Tu We Th Fr Sa + # 1 2 3 4 5 + # 6 7 8 9 10 11 12 + # 13 14 15 16 17 18 19 + # 20 21 22 23 24 25 26 + # 27 28 29 30 31 + + env = SimulationParameters( + period_start=datetime(2007, 12, 31, tzinfo=pytz.utc), + period_end=datetime(2008, 1, 7, tzinfo=pytz.utc), + capital_base=100000, + ) + + expected_trading_days = ( + datetime(2007, 12, 31, tzinfo=pytz.utc), + # Skip new years + # holidays taken from: http://www.nyse.com/press/1191407641943.html + datetime(2008, 1, 2, tzinfo=pytz.utc), + datetime(2008, 1, 3, tzinfo=pytz.utc), + datetime(2008, 1, 4, tzinfo=pytz.utc), + # Skip Saturday + # Skip Sunday + datetime(2008, 1, 7, tzinfo=pytz.utc) + ) + + num_expected_trading_days = 5 + self.assertEquals(num_expected_trading_days, env.days_in_period) + np.testing.assert_array_equal(expected_trading_days, + env.trading_days.tolist()) + + @timed(DEFAULT_TIMEOUT) + def test_market_minute_window(self): + + # January 2008 + # Su Mo Tu We Th Fr Sa + # 1 2 3 4 5 + # 6 7 8 9 10 11 12 + # 13 14 15 16 17 18 19 + # 20 21 22 23 24 25 26 + # 27 28 29 30 31 + + us_east = pytz.timezone('US/Eastern') + utc = pytz.utc + + # 10:01 AM Eastern on January 7th.. + start = us_east.localize(datetime(2008, 1, 7, 10, 1)) + utc_start = start.astimezone(utc) + + # Get the next 10 minutes + minutes = self.env.market_minute_window( + utc_start, 10, + ) + self.assertEqual(len(minutes), 10) + for i in range(10): + self.assertEqual(minutes[i], utc_start + timedelta(minutes=i)) + + # Get the previous 10 minutes. + minutes = self.env.market_minute_window( + utc_start, 10, step=-1, + ) + self.assertEqual(len(minutes), 10) + for i in range(10): + self.assertEqual(minutes[i], utc_start + timedelta(minutes=-i)) + + # Get the next 900 minutes, including utc_start, rolling over into the + # next two days. + # Should include: + # Today: 10:01 AM -> 4:00 PM (360 minutes) + # Tomorrow: 9:31 AM -> 4:00 PM (390 minutes, 750 total) + # Last Day: 9:31 AM -> 12:00 PM (150 minutes, 900 total) + minutes = self.env.market_minute_window( + utc_start, 900, + ) + today = self.env.market_minutes_for_day(start)[30:] + tomorrow = self.env.market_minutes_for_day( + start + timedelta(days=1) + ) + last_day = self.env.market_minutes_for_day( + start + timedelta(days=2))[:150] + + self.assertEqual(len(minutes), 900) + self.assertEqual(minutes[0], utc_start) + self.assertTrue(all(today == minutes[:360])) + self.assertTrue(all(tomorrow == minutes[360:750])) + self.assertTrue(all(last_day == minutes[750:])) + + # Get the previous 801 minutes, including utc_start, rolling over into + # Friday the 4th and Thursday the 3rd. + # Should include: + # Today: 10:01 AM -> 9:31 AM (31 minutes) + # Friday: 4:00 PM -> 9:31 AM (390 minutes, 421 total) + # Thursday: 4:00 PM -> 9:41 AM (380 minutes, 801 total) + minutes = self.env.market_minute_window( + utc_start, 801, step=-1, + ) + + today = self.env.market_minutes_for_day(start)[30::-1] + # minus an extra two days from each of these to account for the two + # weekend days we skipped + friday = self.env.market_minutes_for_day( + start + timedelta(days=-3), + )[::-1] + thursday = self.env.market_minutes_for_day( + start + timedelta(days=-4), + )[:9:-1] + + self.assertEqual(len(minutes), 801) + self.assertEqual(minutes[0], utc_start) + self.assertTrue(all(today == minutes[:31])) + self.assertTrue(all(friday == minutes[31:421])) + self.assertTrue(all(thursday == minutes[421:])) diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index 098237c5..62f392a0 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -18,6 +18,7 @@ import logbook import datetime import pandas as pd +import numpy as np from zipline.data.loader import load_market_data from zipline.utils import tradingcalendar @@ -174,11 +175,39 @@ class TradingEnvironment(object): return None + def previous_trading_day(self, test_date): + dt = self.normalize_date(test_date) + delta = datetime.timedelta(days=-1) + + while self.first_trading_day < test_date: + dt += delta + if dt in self.trading_days: + return dt + + return None + def days_in_range(self, start, end): mask = ((self.trading_days >= start) & (self.trading_days <= end)) return self.trading_days[mask] + def minutes_for_days_in_range(self, start, end): + """ + Get all market minutes for the days between start and end, inclusive. + """ + start_date = self.normalize_date(start) + end_date = self.normalize_date(end) + + all_minutes = [] + for day in self.days_in_range(start_date, end_date): + day_minutes = self.market_minutes_for_day(day) + all_minutes.append(day_minutes) + + # Concatenate all minutes and truncate minutes before start/after end. + return pd.DatetimeIndex( + np.concatenate(all_minutes), copy=False, tz='UTC', + ) + def next_open_and_close(self, start_date): """ Given the start_date, returns the next open and close of @@ -193,15 +222,104 @@ Last successful date: %s" % self.last_trading_day) return self.get_open_and_close(next_open) + def previous_open_and_close(self, start_date): + """ + Given the start_date, returns the previous open and close of the + market. + """ + previous = self.previous_trading_day(start_date) + + if previous is None: + raise NoFurtherDataError( + "Attempt to backtest beyond available history. " + "First successful date: %s" % self.first_trading_day) + return self.get_open_and_close(previous) + + def next_market_minute(self, start): + """ + Get the next market minute after @start. This is either the immediate + next minute, or the open of the next market day after start. + """ + next_minute = start + datetime.timedelta(minutes=1) + if self.is_market_hours(next_minute): + return next_minute + return self.next_open_and_close(start)[0] + + def previous_market_minute(self, start): + """ + Get the next market minute before @start. This is either the immediate + previous minute, or the close of the market day before start. + """ + prev_minute = start - datetime.timedelta(minutes=1) + if self.is_market_hours(prev_minute): + return prev_minute + return self.previous_open_and_close(start)[1] + def get_open_and_close(self, day): todays_minutes = self.open_and_closes.ix[day.date()] return todays_minutes['market_open'], todays_minutes['market_close'] - def market_minutes_for_day(self, midnight): - market_open, market_close = self.get_open_and_close(midnight) + def market_minutes_for_day(self, stamp): + market_open, market_close = self.get_open_and_close(stamp) return pd.date_range(market_open, market_close, freq='T') + def open_close_window(self, start, count, offset=0, step=1): + """ + Return a DataFrame containing `count` market opens and closes, + beginning with `start` + `offset` days and continuing `step` minutes at + a time. + """ + # TODO: Correctly handle end of data. + start_idx = self.get_index(start) + offset + stop_idx = start_idx + (count * step) + + index = np.arange(start_idx, stop_idx, step) + + return self.open_and_closes.iloc[index] + + def market_minute_window(self, start, count, step=1): + """ + Return a DatetimeIndex containing `count` market minutes, starting with + `start` and continuing `step` minutes at a time. + """ + if not self.is_market_hours(start): + raise ValueError("market_minute_window starting at " + "non-market time {minute}".format(minute=start)) + + all_minutes = [] + + current_day_minutes = self.market_minutes_for_day(start) + first_minute_idx = current_day_minutes.searchsorted(start) + minutes_in_range = current_day_minutes[first_minute_idx::step] + + # Build up list of lists of days' market minutes until we have count + # minutes stored altogether. + while True: + + if len(minutes_in_range) >= count: + # Truncate off extra minutes + minutes_in_range = minutes_in_range[:count] + + all_minutes.append(minutes_in_range) + count -= len(minutes_in_range) + if count <= 0: + break + + if step > 0: + start, _ = self.next_open_and_close(start) + current_day_minutes = self.market_minutes_for_day(start) + else: + _, start = self.previous_open_and_close(start) + current_day_minutes = self.market_minutes_for_day(start) + + minutes_in_range = current_day_minutes[::step] + + # Concatenate all the accumulated minutes. + return pd.DatetimeIndex( + np.concatenate(all_minutes), copy=False, tz='UTC', + ) + def trading_day_distance(self, first_date, second_date): first_date = self.normalize_date(first_date) second_date = self.normalize_date(second_date) From bad4c9a4398d7173ad6cc0f3e3d63f636ecc4c98 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Fri, 30 May 2014 18:30:25 -0400 Subject: [PATCH 2/6] ENH: Prep work for supporting '1m' history. Overhauls `HistoryContainer` in prep for support of more than one frequency. Major changes: - Methods/variables referring to "day" have been renamed/generalized. - `current_day_panel` became `buffer_panel`, which is now a `RollingPanel` - `prior_day_panel` became a dictionary mapping `Frequency` objects to "digest panels", which are instances of `RollingPanel`. - Hard-coded daily rollover replaced with a notion of a "current window" for each unique frequency managed by the panel. - When the end of the current window is reached for a given frequency, we compute an aggregate bar (code refers to this as a "digest"), which is appended to a panel associated with that frequency. - Window rollover dates are managed by a pair of dictionaries, `cur_window_starts` and `cur_window_closes`. The `Frequency` class is responsible for computing window bounds based on the open/close of the previous window. - Semantic change to the `open_price` field: `open_price` now always contains the price of the first trade occurring in the given window. Previously it contained the price of the first minute in the window, returning NaN it the security happened not to trade in the first minute. --- tests/test_history.py | 6 +- zipline/history/history.py | 253 +++++++++++--- zipline/history/history_container.py | 494 ++++++++++++++++++--------- zipline/utils/data.py | 15 +- 4 files changed, 556 insertions(+), 212 deletions(-) diff --git a/tests/test_history.py b/tests/test_history.py index f4e0e400..3ca7e10b 100644 --- a/tests/test_history.py +++ b/tests/test_history.py @@ -145,7 +145,7 @@ class TestHistoryContainer(TestCase): field='price', ffill=True ) - specs = {hash(spec): spec} + specs = {spec.key_str: spec} initial_sids = [1, ] initial_dt = pd.Timestamp( '2013-06-28 9:31AM', tz='US/Eastern').tz_convert('UTC') @@ -154,7 +154,7 @@ class TestHistoryContainer(TestCase): specs, initial_sids, initial_dt) bar_data = BarData() - + container.update(bar_data, initial_dt) # Since there was no backfill because of no db. # And no first bar of data, so all values should be nans. prices = container.get_history(spec, initial_dt) @@ -169,7 +169,6 @@ class TestHistoryContainer(TestCase): 'price': 10, 'dt': second_bar_dt } - container.update(bar_data, second_bar_dt) prices = container.get_history(spec, second_bar_dt) @@ -288,7 +287,6 @@ def handle_data(context, data): # 12 13 14 15 16 17 18 # 19 20 21 22 23 24 25 # 26 27 28 29 30 31 - start = pd.Timestamp('2006-03-20', tz='UTC') end = pd.Timestamp('2006-03-21', tz='UTC') diff --git a/zipline/history/history.py b/zipline/history/history.py index dcf49694..9e7853de 100644 --- a/zipline/history/history.py +++ b/zipline/history/history.py @@ -32,19 +32,183 @@ class Frequency(object): Represents how the data is sampled, as specified by the algoscript via units like "1d", "1m", etc. - Currently only one frequency is supported, "1d" - "1d" provides data keyed by closing, and the last minute of the current - day. + Currently only two frequencies are supported, "1d" and "1m" + + - "1d" provides data at daily frequency, with the latest bar aggregating + the elapsed minutes of the (incomplete) current day + - "1m" provides data at minute frequency """ + SUPPORTED_FREQUENCIES = frozenset({'1d'}) + MAX_MINUTES = {'m': 1, 'd': 390} def __init__(self, freq_str): + + if freq_str not in self.SUPPORTED_FREQUENCIES: + raise ValueError( + "history frequency must be in {supported}".format( + supported=self.SUPPORTED_FREQUENCIES, + )) # The string the at the algoscript specifies. # Hold onto to use a key for caching. self.freq_str = freq_str + # num - The number of units of the frequency. # unit_str - The unit type, e.g. 'd' self.num, self.unit_str = parse_freq_str(freq_str) + def next_window_start(self, previous_window_close): + """ + Get the first minute of the window starting after a window that + finished on @previous_window_close. + """ + if self.unit_str == 'd': + return self.next_day_window_start(previous_window_close) + elif self.unit_str == 'm': + return self.next_minute_window_start(previous_window_close) + + @staticmethod + def next_day_window_start(previous_window_close): + """ + Get the next day window start after @previous_window_close. This is + defined as the first market open strictly greater than + @previous_window_close. + """ + env = trading.environment + next_open, _ = env.next_open_and_close(previous_window_close) + return next_open + + @staticmethod + def next_minute_window_start(previous_window_close): + """ + Get the next minute window start after @previous_window_close. This is + defined as the first market minute strictly greater than + @previous_window_close. + """ + env = trading.environment + return env.next_market_minute(previous_window_close) + + def window_open(self, window_close): + """ + For a period ending on `window_end`, calculate the date of the first + minute bar that should be used to roll a digest for this frequency. + """ + if self.unit_str == 'd': + return self.day_window_open(window_close, self.num) + elif self.unit_str == 'm': + return self.minute_window_open(window_close, self.num) + + def window_close(self, window_start): + """ + For a period starting on `window_start`, calculate the date of the last + minute bar that should be used to roll a digest for this frequency. + """ + if self.unit_str == 'd': + return self.day_window_close(window_start, self.num) + elif self.unit_str == 'm': + return self.minute_window_close(window_start, self.num) + + @staticmethod + def day_window_open(window_close, num_days): + """ + Get the first minute for a daily window of length @num_days with last + minute @window_close. This is calculated by searching backward until + @num_days market_closes are encountered. + """ + env = trading.environment + open_ = env.open_close_window( + window_close, + 1, + offset=-(num_days - 1) + ).market_open.iloc[0] + return open_ + + @staticmethod + def minute_window_open(window_close, num_minutes): + """ + Get the first minute for a minutely window of length @num_minutes with + last minute @window_close. + + This is defined as window_close if num_minutes == 1, and otherwise as + the N-1st market minute after @window_start. + """ + if num_minutes == 1: + # Short circuit this case. + return window_close + + env = trading.environment + return env.market_minute_window(window_close, count=-num_minutes)[-1] + + @staticmethod + def day_window_close(window_start, num_days): + """ + Get the last minute for a daily window of length @num_days with first + minute @window_start. This is calculated by searching forward until + @num_days market closes are encountered. + + Examples: + + window_start = Thursday March 2nd, 2006, 9:31 AM EST + num_days = 1 + --> window_close = Thursday March 2nd, 2006, 4:00 PM EST + + window_start = Thursday March 2nd, 2006, 3:59 AM EST + num_days = 1 + --> window_close = Thursday March 2nd, 2006, 4:00 PM EST + + window_start = Thursday March 2nd, 2006, 9:31 AM EST + num_days = 2 + --> window_close = Friday March 2nd, 2006, 4:00 PM EST + + window_start = Thursday March 2nd, 2006, 9:31 AM EST + num_days = 3 + --> window_close = Monday March 6th, 2006, 4:00 PM EST + + # Day before July 4th is an early close + window_start = Wednesday July 3rd, 2013, 9:31 AM EST + num_days = 1 + --> window_close = Wednesday July 3rd, 2013, 1:00 PM EST + """ + env = trading.environment + close = env.open_close_window( + window_start, + 1, + offset=num_days - 1 + ).market_close.iloc[0] + return close + + @staticmethod + def minute_window_close(window_start, num_minutes): + """ + Get the last minute for a minutely window of length @num_minutes with + first minute @window_start. + + This is defined as window_start if num_minutes == 1, and otherwise as + the N-1st market minute after @window_start. + """ + if num_minutes == 1: + # Short circuit this case. + return window_start + + env = trading.environment + return env.market_minute_window(window_start, count=num_minutes)[-1] + + @property + def max_minutes(self): + """ + The maximum number of minutes required to roll a bar at this frequency. + """ + return self.MAX_MINUTES[self.unit_str] * self.num + + def __eq__(self, other): + return self.freq_str == other.freq_str + + def __hash__(self): + return hash(self.freq_str) + + def __repr__(self): + return ''.join([str(self.__class__.__name__), + "('", self.freq_str, "')"]) + class HistorySpec(object): """ @@ -75,61 +239,64 @@ class HistorySpec(object): # Whether or not to forward fill the nan data. self.ffill = ffill - # How many trading days the spec needs to look back. - # Used by index creation to see how large of an overarching window - # is needed. - self.days_needed = calculate_days_needed( - self.bar_count, self.frequency) - # Calculate the cache key string once. self.key_str = self.spec_key( bar_count, frequency.freq_str, field, ffill) + def __repr__(self): + return ''.join([self.__class__.__name__, "('", self.key_str, "')"]) -def calculate_days_needed(bar_count, freq): - """ Returns number trading days needed. - Overshoots so that we more than enough to sample from the current - frequency slot plus previous ones. + +def days_index_at_dt(history_spec, algo_dt): """ - if freq.unit_str == 'd': - return bar_count * freq.num - - -def days_index_at_dt(days_needed, algo_dt): - """ - The timestamps of previous days closes with the size of @days_needed - at @algo_dt. + Get the index of a frame to be used for a get_history call with daily + frequency. """ env = trading.environment + # Get the previous (bar_count - 1) days' worth of market closes. + day_delta = (history_spec.bar_count - 1) * history_spec.frequency.num + market_closes = env.open_close_window( + algo_dt, + day_delta, + offset=(-day_delta), + step=history_spec.frequency.num, + ).market_close - latest_algo_dt = algo_dt - - current_index = env.open_and_closes.index.searchsorted(algo_dt.date()) - - previous_days_num = days_needed - 1 - - previous_days = env.open_and_closes['market_close'][ - current_index - previous_days_num:current_index] - + # Append the current algo_dt as the last index value. # Using the 'rawer' numpy array values here because of a bottleneck # that appeared when using DatetimeIndex - return np.append(previous_days.values, latest_algo_dt) + return np.append(market_closes.values, algo_dt) + + +def minutes_index_at_dt(history_spec, algo_dt): + """ + Get the index of a frame to be used for a get_history_call with minutely + frequency. + """ + # TODO: This is almost certainly going to be too slow for production. + env = trading.environment + return env.market_minute_window( + algo_dt, + history_spec.bar_count, + step=-1, + )[::-1] def index_at_dt(history_spec, algo_dt): """ - The index, including @algo_dt at the given @algo_dt for the count - and frequency of the @history_spec. + Returns index of a frame returned by get_history() with the given + history_spec and algo_dt. + + The resulting index `@history_spec.bar_count` bars, increasing in units of + `@history_spec.frequency`, terminating at the given @algo_dt. + + Note: The last bar of the returned frame represents an as-of-yet incomplete + time window, so the delta between the last and second-to-last bars is + usually always less than `@history_spec.frequency` for frequencies greater + than 1m. """ - days_index = days_index_at_dt(history_spec.days_needed, algo_dt) - frequency = history_spec.frequency - if frequency.unit_str == 'd': - - index_of_algo_dt = days_index.searchsorted(algo_dt) - - start_index = index_of_algo_dt + 1 - history_spec.bar_count - end_index = index_of_algo_dt + 1 - - return days_index[start_index:end_index] + return days_index_at_dt(history_spec, algo_dt) + elif frequency.unit_str == 'm': + return minutes_index_at_dt(history_spec, algo_dt) diff --git a/zipline/history/history_container.py b/zipline/history/history_container.py index e7b74b86..ce2da464 100644 --- a/zipline/history/history_container.py +++ b/zipline/history/history_container.py @@ -12,14 +12,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from itertools import groupby import numpy as np import pandas as pd -from six import itervalues + +from six import itervalues, iteritems, iterkeys from . history import ( index_at_dt, - days_index_at_dt, ) from zipline.finance import trading @@ -30,38 +31,79 @@ from zipline.utils.data import RollingPanel CLOSING_PRICE_FIELDS = {'price', 'close_price'} -def create_initial_day_panel(days_needed, fields, sids, dt): - index = days_index_at_dt(days_needed, dt) - # Use original index in case of 1 bar. - if days_needed != 1: - index = index[:-1] - window = len(index) - rp = RollingPanel(window, fields, sids) - for i, day in enumerate(index): - rp.index_buf[i] = day - rp.pos = window - return rp +def ffill_buffer_from_prior_values(field, + buffer_frame, + digest_frame, + pre_digest_values): + """ + Forward-fill a buffer frame, falling back to the end-of-period values of a + digest frame if the buffer frame has leading NaNs. + """ + + if field == 'volume': + # Volume is never forward-filled. + return buffer_frame + + # Get values which are NaN at the beginning of the period. + first_bar = buffer_frame.iloc[0] + + def iter_nan_sids(): + """ + Helper for iterating over the remaining nan sids in first_bar. + """ + return (sid for sid in first_bar[first_bar.isnull()].index) + + # Try to fill with the last entry from the digest frame. + if digest_frame is not None: + # We don't store a digest frame for frequencies that only have a bar + # count of 1. + for sid in iter_nan_sids(): + buffer_frame[sid][0] = digest_frame.ix[-1, sid] + + # If we still have nan sids, try to fill with pre_digest_values. + for sid in iter_nan_sids(): + prior_sid_value = pre_digest_values[field].get(sid) + if prior_sid_value: + # If the prior value is greater than the timestamp of our first + # bar. + if prior_sid_value.get('dt', first_bar.name) > first_bar.name: + buffer_frame[sid][0] = prior_sid_value.get('value', np.nan) + + return buffer_frame.ffill() -def create_current_day_panel(fields, sids, dt): - # Can't use open_and_close since need to create enough space for a full - # day, even on a half day. - # Can now use mkt open and close, since we don't roll - env = trading.environment - index = env.market_minutes_for_day(dt) - return pd.Panel(items=fields, minor_axis=sids, major_axis=index) +def freq_str_and_bar_count(history_spec): + """ + Helper for getting the frequency string from a history spec. + """ + return (history_spec.frequency.freq_str, history_spec.bar_count) -def ffill_day_frame(field, day_frame, prior_day_frame): - # get values which are nan-at the beginning of the day - # and attempt to fill with the last close - first_bar = day_frame.ix[0] - nan_sids = first_bar[np.isnan(first_bar)] - for sid, _ in nan_sids.iterkv(): - day_frame[sid][0] = prior_day_frame.ix[-1, sid] - if field != 'volume': - day_frame = day_frame.ffill() - return day_frame +def group_by_frequency(history_specs): + """ + Takes an iterable of history specs and returns a dictionary mapping unique + frequencies to a list of specs with that frequency. + + Within each list, the HistorySpecs are sorted by ascending bar count. + + Example: + + [HistorySpec(3, '1d', 'price', True), + HistorySpec(2, '2d', 'open', True), + HistorySpec(2, '1d', 'open', False), + HistorySpec(5, '1m', 'open', True)] + + yields + + {Frequency('1d') : [HistorySpec(2, '1d', 'open', False)], + HistorySpec(3, '1d', 'price', True), + Frequency('2d') : [HistorySpec(2, '2d', 'open', True)], + Frequency('1m') : [HistorySpec(5, '1m', 'open', True)]} + """ + return {key: list(group) + for key, group in groupby( + sorted(history_specs, key=freq_str_and_bar_count), + key=lambda spec: spec.frequency)} class HistoryContainer(object): @@ -78,35 +120,105 @@ class HistoryContainer(object): # History specs to be served by this container. self.history_specs = history_specs - - # The overaching panel needs to be large enough to contain the - # largest history spec - self.max_days_needed = max(spec.days_needed for spec - in itervalues(history_specs)) + self.frequency_groups = \ + group_by_frequency(itervalues(self.history_specs)) # The set of fields specified by all history specs self.fields = set(spec.field for spec in itervalues(history_specs)) - self.prior_day_panel = create_initial_day_panel( - self.max_days_needed, self.fields, initial_sids, initial_dt) + # This panel contains raw minutes for periods that haven't been fully + # completed. When a frequency period rolls over, these minutes are + # digested using some sort of aggregation call on the panel (e.g. `sum` + # for volume, `max` for high, `min` for low, etc.). + self.buffer_panel = self.create_buffer_panel( + initial_sids, + initial_dt, + ) - # This panel contains the minutes for the current day. - # The value that is used is some sort of aggregation call on the - # panel, e.g. `sum` for volume, `max` for high, etc. - self.current_day_panel = create_current_day_panel( - self.fields, initial_sids, initial_dt) + # Dictionaries with Frequency objects as keys. + self.digest_panels, self.cur_window_starts, self.cur_window_closes = \ + self.create_digest_panels(initial_sids, initial_dt) + + # Populating initial frames here, so that the cost of creating the + # initial frames does not show up when profiling. These frames are + # cached since mid-stream creation of containing data frames on every + # bar is expensive. + self.create_return_frames(initial_dt) # Helps prop up the prior day panel against having a nan, when the data # has been seen. self.last_known_prior_values = {field: {} for field in self.fields} - # Populating initial frames here, so that the cost of creating the - # initial frames does not show up when profiling get_y - # These frames are cached since mid-stream creation of containing - # data frames on every bar is expensive. - self.return_frames = {} + @property + def unique_frequencies(self): + """ + Return an iterator over all the unique frequencies serviced by this + container. + """ + return iterkeys(self.frequency_groups) - self.create_return_frames(initial_dt) + def create_digest_panels(self, initial_sids, initial_dt): + """ + Initialize a RollingPanel for each unique panel frequency being stored + by this container. Each RollingPanel pre-allocates enough storage + space to service the highest bar-count of any history call that it + serves. + + Relies on the fact that group_by_frequency sorts the value lists by + ascending bar count. + """ + # Map from frequency -> first/last minute of the next digest to be + # rolled for that frequency. + first_window_starts = {} + first_window_closes = {} + + # Map from frequency -> digest_panels. + panels = {} + for freq, specs in iteritems(self.frequency_groups): + + # Relying on the sorting of group_by_frequency to get the spec + # requiring the largest number of bars. + largest_spec = specs[-1] + if largest_spec.bar_count == 1: + # No need to allocate a digest panel; this frequency will only + # ever use data drawn from self.buffer_panel. + env = trading.environment + first_window_closes[freq] = \ + env.get_open_and_close(initial_dt)[1] + first_window_starts[freq] = \ + freq.window_open(first_window_closes[freq]) + continue + + initial_dates = index_at_dt(largest_spec, initial_dt) + + # Set up dates for our first digest roll, which is keyed to the + # close of the first entry in our initial index. + first_window_closes[freq] = initial_dates[0] + first_window_starts[freq] = freq.window_open(initial_dates[0]) + + rp = RollingPanel(len(initial_dates) - 1, + self.fields, + initial_sids) + + panels[freq] = rp + + return panels, first_window_starts, first_window_closes + + def create_buffer_panel(self, initial_sids, initial_dt): + """ + Initialize a RollingPanel containing enough minutes to service all our + frequencies. + """ + max_bars_needed = max(freq.max_minutes + for freq in self.unique_frequencies) + rp = RollingPanel( + max_bars_needed, + self.fields, + initial_sids, + # Restrict the initial data down to just the fields being used in + # this container. + ) + return rp def create_return_frames(self, algo_dt): """ @@ -114,101 +226,146 @@ class HistoryContainer(object): Called during init and at universe rollovers. """ - for history_spec in itervalues(self.history_specs): - index = index_at_dt(history_spec, algo_dt) - index = pd.to_datetime(index) + self.return_frames = {} + for spec_key, history_spec in iteritems(self.history_specs): + index = pd.to_datetime(index_at_dt(history_spec, algo_dt)) frame = pd.DataFrame( index=index, - columns=map(int, self.current_day_panel.minor_axis.values), + columns=map(int, self.buffer_panel.minor_axis.values), dtype=np.float64) - self.return_frames[history_spec] = frame + self.return_frames[spec_key] = frame + + def buffer_panel_minutes(self, + buffer_panel=None, + earliest_minute=None, + latest_minute=None): + """ + Get the minutes in @buffer_panel between @earliest_minute and + @last_minute, inclusive. + + @buffer_panel can be a RollingPanel or a plain Panel. If a + RollingPanel is supplied, we call `get_current` to extract a Panel + object. If no panel is supplied, we use self.buffer_panel. + + If no value is specified for @earliest_minute, use all the minutes we + have up until @latest minute. + + If no value for @latest_minute is specified, use all values up until + the latest minute. + """ + buffer_panel = buffer_panel or self.buffer_panel + if isinstance(buffer_panel, RollingPanel): + buffer_panel = buffer_panel.get_current() + + return buffer_panel.ix[:, earliest_minute:latest_minute, :] def update(self, data, algo_dt): """ - Takes the bar at @algo_dt's @data and adds to the current day panel. + Takes the bar at @algo_dt's @data, checks to see if we need to roll any + new digests, then adds new data to the buffer panel. """ - self.check_and_roll(algo_dt) + self.update_digest_panels(algo_dt, self.buffer_panel) fields = self.fields - field_data = {sid: {field: bar[field] for field in fields} - for sid, bar in data.iteritems() - if (bar - and - bar['dt'] == algo_dt - and - # Only use data which is keyed in the data panel. - # Prevents crashes due to custom data. - sid in self.current_day_panel.minor_axis)} - field_frame = pd.DataFrame(field_data) - self.current_day_panel.ix[:, algo_dt, :] = field_frame.T + frame = pd.DataFrame( + {sid: {field: bar[field] for field in fields} + for sid, bar in data.iteritems() + if (bar + and + bar['dt'] == algo_dt + and + # Only use data which is keyed in the data panel. + # Prevents crashes due to custom data. + sid in self.buffer_panel.minor_axis)}) + self.buffer_panel.add_frame(algo_dt, frame) + + def update_digest_panels(self, algo_dt, buffer_panel): + """ + Check whether @algo_dt is greater than cur_window_close for any of our + frequencies. If so, roll a digest for that frequency using data drawn + from @buffer panel and insert it into the appropriate digest panels. + """ + for frequency in self.unique_frequencies: + + # We don't keep a digest panel if we only have a length-1 history + # spec for a given frequency + digest_panel = self.digest_panels.get(frequency, None) + + while algo_dt > self.cur_window_closes[frequency]: + + earliest_minute = self.cur_window_starts[frequency] + latest_minute = self.cur_window_closes[frequency] + minutes_to_process = self.buffer_panel_minutes( + buffer_panel, + earliest_minute=earliest_minute, + latest_minute=latest_minute, + ) + + # Create a digest from minutes_to_process and add it to + # digest_panel. + self.roll(frequency, + digest_panel, + minutes_to_process, + latest_minute) + + # Update panel start/close for this frequency. + self.cur_window_starts[frequency] = \ + frequency.next_window_start(latest_minute) + self.cur_window_closes[frequency] = \ + frequency.window_close(self.cur_window_starts[frequency]) + + def roll(self, frequency, digest_panel, buffer_minutes, digest_dt): + """ + Package up minutes in @buffer_minutes insert that bar into + @digest_panel at index @last_minute, and update + self.cur_window_{starts|closes} for the given frequency. + """ + if digest_panel is None: + # This happens if the only spec we have at this frequency has a bar + # count of 1. + return - def roll(self, roll_dt): - env = trading.environment - # This should work for price, but not others, e.g. - # open. - # Get the most recent value. rolled = pd.DataFrame( - index=self.current_day_panel.items, - columns=self.current_day_panel.minor_axis) + index=self.fields, + columns=buffer_minutes.minor_axis) - for field in self.fields: - if field in CLOSING_PRICE_FIELDS: - # Use the last price. - prices = self.current_day_panel.ffill().ix[field, -1, :] - rolled.ix[field] = prices - elif field == 'open_price': - # Use the first price. - opens = self.current_day_panel.ix['open_price', 0, :] - rolled.ix['open_price'] = opens - elif field == 'volume': - # Volume is the sum of the volumes during the - # course of the day - volumes = self.current_day_panel.ix['volume'].apply(np.sum) - rolled.ix['volume'] = volumes - elif field == 'high': - # Use the highest high. - highs = self.current_day_panel.ix['high'].apply(np.max) - rolled.ix['high'] = highs - elif field == 'low': - # Use the lowest low. - lows = self.current_day_panel.ix['low'].apply(np.min) - rolled.ix['low'] = lows + if len(buffer_minutes.major_axis) > 0: + for field in self.fields: + if field in CLOSING_PRICE_FIELDS: + # Use the last price. + prices = buffer_minutes.ffill().ix[field, -1, :] + rolled.ix[field] = prices + elif field == 'open_price': + # Use the first price. + opens = buffer_minutes.ix['open_price', 0, :] + rolled.ix['open_price'] = opens + elif field == 'volume': + # Volume is the sum of the volumes during the + # course of the day + volumes = buffer_minutes.ix['volume'].apply(np.sum) + rolled.ix['volume'] = volumes + elif field == 'high': + # Use the highest high. + highs = buffer_minutes.ix['high'].apply(np.max) + rolled.ix['high'] = highs + elif field == 'low': + # Use the lowest low. + lows = buffer_minutes.ix['low'].apply(np.min) + rolled.ix['low'] = lows - for sid, value in rolled.ix[field].iterkv(): - if not np.isnan(value): - try: - prior_values = self.last_known_prior_values[field][sid] - except KeyError: - prior_values = {} - self.last_known_prior_values[field][sid] = prior_values - prior_values['dt'] = roll_dt - prior_values['value'] = value + for sid, value in rolled.ix[field].iterkv(): + if not np.isnan(value): + try: + prior_values = \ + self.last_known_prior_values[field][sid] + except KeyError: + prior_values = {} + self.last_known_prior_values[field][sid] = \ + prior_values + prior_values['dt'] = digest_dt + prior_values['value'] = value - self.prior_day_panel.add_frame(roll_dt, rolled) - - # Create a new 'current day' collector. - next_day = env.next_trading_day(roll_dt) - - if next_day: - # Only create the next panel if there is a next day. - # i.e. don't create the next panel on the last day of - # the backest/current day of live trading. - self.current_day_panel = create_current_day_panel( - self.fields, - # Will break on quarter rollover. - self.current_day_panel.minor_axis, - next_day) - - def check_and_roll(self, algo_dt): - """ - Check whether the algo_dt is at the end of a day. - If it is, aggregate the day's minute data and store it in the prior - day panel. - """ - # Use a while loop to account for illiquid bars. - while algo_dt > self.current_day_panel.major_axis[-1]: - roll_dt = self.current_day_panel.major_axis[-1] - self.roll(roll_dt) + digest_panel.add_frame(digest_dt, rolled) def get_history(self, history_spec, algo_dt): """ @@ -217,57 +374,74 @@ class HistoryContainer(object): Selects from the overarching history panel the values for the @history_spec at the given @algo_dt. """ + field = history_spec.field + bar_count = history_spec.bar_count + index = pd.to_datetime(index_at_dt(history_spec, algo_dt)) + return_frame = self.return_frames[history_spec.key_str] - index = index_at_dt(history_spec, algo_dt) - index = pd.to_datetime(index) - - frame = self.return_frames[history_spec] # Overwrite the index. # Not worrying about values here since the values are overwritten # in the next step. - frame.index = index + return_frame.index = index - prior_day_panel = self.prior_day_panel.get_current() - prior_day_frame = prior_day_panel[field].copy() - if history_spec.ffill: - first_bar = prior_day_frame.ix[0] - nan_sids = first_bar[first_bar.isnull()] - for sid, _ in nan_sids.iterkv(): + if bar_count > 1: + # Get the last bar_count - 1 frames from our stored historical + # frames. + digest_panel = self.digest_panels[history_spec.frequency]\ + .get_current() + digest_frame = digest_panel[field].copy().ix[1 - bar_count:] + else: + digest_frame = None + + if digest_frame is not None and history_spec.ffill: + # It's possible that the first bar in our digest frame is storing + # NaN values. If so, check if we've tracked an older value and use + # that as an ffill value for the first bar. + first_bar = digest_frame.ix[0] + nan_sids = first_bar[first_bar.isnull()].index + for sid in nan_sids: try: - if ( - # Only use prior value if it is before the index, - # so that a backfill does not accidentally occur. + # Only use prior value if it is before the index, + # so that a backfill does not accidentally occur. + have_pre_frame_value = ( self.last_known_prior_values[field][sid]['dt'] <= - prior_day_frame.index[0]): - prior_day_frame[sid][0] =\ + digest_frame.index[0] + ) + if have_pre_frame_value: + digest_frame[sid][0] =\ self.last_known_prior_values[field][sid]['value'] except KeyError: # Allow case where there is no previous value. # e.g. with leading nans. pass - prior_day_frame = prior_day_frame.ffill() - frame.ix[:-1] = prior_day_frame.ix[:] + digest_frame = digest_frame.ffill() + return_frame.ix[:-1] = digest_frame.ix[:] + + # Get minutes from our buffer panel to build the last row. + frequency = history_spec.frequency + buffer_frame = self.buffer_panel_minutes( + earliest_minute=self.cur_window_starts[frequency], + )[field].copy() - # Copy the current day frame, since the fill behavior will mutate - # the values in the panel. - current_day_frame = self.current_day_panel[field][:algo_dt].copy() if history_spec.ffill: - current_day_frame = ffill_day_frame(field, - current_day_frame, - prior_day_frame) - + buffer_frame = ffill_buffer_from_prior_values( + field, + buffer_frame, + digest_frame, + self.last_known_prior_values, + ) if field == 'volume': # This works for the day rollup, i.e. '1d', # but '1m' will need to allow for 0 or nan minutes - frame.ix[algo_dt] = current_day_frame.sum() + return_frame.ix[algo_dt] = buffer_frame.sum() elif field == 'high': - frame.ix[algo_dt] = current_day_frame.max() + return_frame.ix[algo_dt] = buffer_frame.max() elif field == 'low': - frame.ix[algo_dt] = current_day_frame.min() + return_frame.ix[algo_dt] = buffer_frame.min() elif field == 'open_price': - frame.ix[algo_dt] = current_day_frame.ix[0] + return_frame.ix[algo_dt] = buffer_frame.ix[0] else: - frame.ix[algo_dt] = current_day_frame.ix[algo_dt] + return_frame.ix[algo_dt] = buffer_frame.ix[algo_dt] - return frame + return return_frame diff --git a/zipline/utils/data.py b/zipline/utils/data.py index 67a3c4ae..5dbd8c31 100644 --- a/zipline/utils/data.py +++ b/zipline/utils/data.py @@ -32,8 +32,8 @@ class RollingPanel(object): Restrictions: major_axis can only be a DatetimeIndex for now """ - def __init__(self, window, items, sids, cap_multiple=2, - dtype=np.float64): + def __init__(self, window, items, sids, cap_multiple=2, dtype=np.float64): + self.pos = 0 self.window = window @@ -49,9 +49,14 @@ class RollingPanel(object): self.buffer = self._create_buffer() def _create_buffer(self): - return pd.Panel(items=self.items, minor_axis=self.minor_axis, - major_axis=range(self.cap), - dtype=self.dtype) + panel = pd.Panel( + items=self.items, + minor_axis=self.minor_axis, + major_axis=range(self.cap), + dtype=self.dtype, + ) + + return panel def _update_buffer(self, frame): # Drop outdated, nan-filled minors (sids) and items (fields) From 15f1947652656433d8d83d58c12332236a6343a2 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Mon, 2 Jun 2014 23:22:41 -0400 Subject: [PATCH 3/6] BUG: Don't return mostly nans from `get_history` when `ffill=False`. Fixes an issue where, if `ffill=False`, `get_history` would return nans for every entry in the history frame except the last one. --- zipline/history/history_container.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zipline/history/history_container.py b/zipline/history/history_container.py index ce2da464..f0b31c4a 100644 --- a/zipline/history/history_container.py +++ b/zipline/history/history_container.py @@ -416,6 +416,8 @@ class HistoryContainer(object): # e.g. with leading nans. pass digest_frame = digest_frame.ffill() + + if digest_frame is not None: return_frame.ix[:-1] = digest_frame.ix[:] # Get minutes from our buffer panel to build the last row. From 3a1fc1032e4f815610f4fc2c1ca1755265bfc9d1 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Tue, 3 Jun 2014 15:26:55 -0400 Subject: [PATCH 4/6] ENH: Overhaul logic in `HistoryContainer`. Updates `HistoryContainer.roll` to handle cases where no data is present for the period being rolled. We now only forward-fill the `price` field when `ffill` is specified. --- zipline/history/history.py | 12 +- zipline/history/history_container.py | 166 +++++++++++++++------------ 2 files changed, 104 insertions(+), 74 deletions(-) diff --git a/zipline/history/history.py b/zipline/history/history.py index 9e7853de..2adbd103 100644 --- a/zipline/history/history.py +++ b/zipline/history/history.py @@ -219,6 +219,8 @@ class HistorySpec(object): result frames. """ + FORWARD_FILLABLE = frozenset({'price'}) + @classmethod def spec_key(cls, bar_count, freq_str, field, ffill): """ @@ -237,12 +239,20 @@ class HistorySpec(object): # The field, e.g. 'price', 'volume', etc. self.field = field # Whether or not to forward fill the nan data. - self.ffill = ffill + self._ffill = ffill # Calculate the cache key string once. self.key_str = self.spec_key( bar_count, frequency.freq_str, field, ffill) + @property + def ffill(self): + """ + Wrapper around ffill that returns False for fields which are not + forward-fillable. + """ + return self._ffill and self.field in self.FORWARD_FILLABLE + def __repr__(self): return ''.join([self.__class__.__name__, "('", self.key_str, "')"]) diff --git a/zipline/history/history_container.py b/zipline/history/history_container.py index f0b31c4a..38811c88 100644 --- a/zipline/history/history_container.py +++ b/zipline/history/history_container.py @@ -26,9 +26,10 @@ from . history import ( from zipline.finance import trading from zipline.utils.data import RollingPanel + # The closing price is referred to by multiple names, # allow both for price rollover logic etc. -CLOSING_PRICE_FIELDS = {'price', 'close_price'} +CLOSING_PRICE_FIELDS = frozenset({'price', 'close_price'}) def ffill_buffer_from_prior_values(field, @@ -40,10 +41,6 @@ def ffill_buffer_from_prior_values(field, digest frame if the buffer frame has leading NaNs. """ - if field == 'volume': - # Volume is never forward-filled. - return buffer_frame - # Get values which are NaN at the beginning of the period. first_bar = buffer_frame.iloc[0] @@ -72,6 +69,35 @@ def ffill_buffer_from_prior_values(field, return buffer_frame.ffill() +def ffill_digest_frame_from_prior_values(field, digest_frame, prior_values): + """ + Forward-fill a digest frame, falling back to the last known priof values if + necessary. + """ + if digest_frame is not None: + # Digest frame is None in the case that we only have length 1 history + # specs for a given frequency. + + # It's possible that the first bar in our digest frame is storing NaN + # values. If so, check if we've tracked an older value and use that as + # an ffill value for the first bar. + first_bar = digest_frame.ix[0] + nan_sids = first_bar[first_bar.isnull()].index + for sid in nan_sids: + try: + # Only use prior value if it is before the index, + # so that a backfill does not accidentally occur. + if prior_values[field][sid]['dt'] <= digest_frame.index[0]: + digest_frame[sid][0] = prior_values[field][sid]['value'] + + except KeyError: + # Allow case where there is no previous value. + # e.g. with leading nans. + pass + digest_frame = digest_frame.ffill() + return digest_frame + + def freq_str_and_bar_count(history_spec): """ Helper for getting the frequency string from a history spec. @@ -329,41 +355,53 @@ class HistoryContainer(object): index=self.fields, columns=buffer_minutes.minor_axis) - if len(buffer_minutes.major_axis) > 0: - for field in self.fields: - if field in CLOSING_PRICE_FIELDS: - # Use the last price. - prices = buffer_minutes.ffill().ix[field, -1, :] - rolled.ix[field] = prices - elif field == 'open_price': - # Use the first price. - opens = buffer_minutes.ix['open_price', 0, :] - rolled.ix['open_price'] = opens - elif field == 'volume': - # Volume is the sum of the volumes during the - # course of the day - volumes = buffer_minutes.ix['volume'].apply(np.sum) - rolled.ix['volume'] = volumes - elif field == 'high': - # Use the highest high. - highs = buffer_minutes.ix['high'].apply(np.max) - rolled.ix['high'] = highs - elif field == 'low': - # Use the lowest low. - lows = buffer_minutes.ix['low'].apply(np.min) - rolled.ix['low'] = lows + for field in self.fields: - for sid, value in rolled.ix[field].iterkv(): - if not np.isnan(value): - try: - prior_values = \ - self.last_known_prior_values[field][sid] - except KeyError: - prior_values = {} - self.last_known_prior_values[field][sid] = \ - prior_values - prior_values['dt'] = digest_dt - prior_values['value'] = value + if field in CLOSING_PRICE_FIELDS: + # Use the last close, or NaN if we have no minutes. + try: + prices = buffer_minutes.loc[field].ffill().iloc[-1] + except IndexError: + # Scalar assignment sets the value for all entries. + prices = np.nan + rolled.ix[field] = prices + + elif field == 'open_price': + # Use the first open, or NaN if we have no minutes. + try: + opens = buffer_minutes.loc[field].bfill().iloc[0] + except IndexError: + # Scalar assignment sets the value for all entries. + opens = np.nan + rolled.ix['open_price'] = opens + + elif field == 'volume': + # Volume is the sum of the volumes during the + # course of the period. + volumes = buffer_minutes.ix['volume'].sum().fillna(0) + rolled.ix['volume'] = volumes + + elif field == 'high': + # Use the highest high. + highs = buffer_minutes.ix['high'].max() + rolled.ix['high'] = highs + + elif field == 'low': + # Use the lowest low. + lows = buffer_minutes.ix['low'].min() + rolled.ix['low'] = lows + + for sid, value in rolled.ix[field].iterkv(): + if not np.isnan(value): + try: + prior_values = \ + self.last_known_prior_values[field][sid] + except KeyError: + prior_values = {} + self.last_known_prior_values[field][sid] = \ + prior_values + prior_values['dt'] = digest_dt + prior_values['value'] = value digest_panel.add_frame(digest_dt, rolled) @@ -377,6 +415,8 @@ class HistoryContainer(object): field = history_spec.field bar_count = history_spec.bar_count + do_ffill = history_spec.ffill + index = pd.to_datetime(index_at_dt(history_spec, algo_dt)) return_frame = self.return_frames[history_spec.key_str] @@ -394,56 +434,36 @@ class HistoryContainer(object): else: digest_frame = None - if digest_frame is not None and history_spec.ffill: - # It's possible that the first bar in our digest frame is storing - # NaN values. If so, check if we've tracked an older value and use - # that as an ffill value for the first bar. - first_bar = digest_frame.ix[0] - nan_sids = first_bar[first_bar.isnull()].index - for sid in nan_sids: - try: - # Only use prior value if it is before the index, - # so that a backfill does not accidentally occur. - have_pre_frame_value = ( - self.last_known_prior_values[field][sid]['dt'] <= - digest_frame.index[0] - ) - if have_pre_frame_value: - digest_frame[sid][0] =\ - self.last_known_prior_values[field][sid]['value'] - except KeyError: - # Allow case where there is no previous value. - # e.g. with leading nans. - pass - digest_frame = digest_frame.ffill() - - if digest_frame is not None: - return_frame.ix[:-1] = digest_frame.ix[:] - # Get minutes from our buffer panel to build the last row. - frequency = history_spec.frequency buffer_frame = self.buffer_panel_minutes( - earliest_minute=self.cur_window_starts[frequency], + earliest_minute=self.cur_window_starts[history_spec.frequency], )[field].copy() - if history_spec.ffill: + if do_ffill: + digest_frame = ffill_digest_frame_from_prior_values( + field, + digest_frame, + self.last_known_prior_values, + ) buffer_frame = ffill_buffer_from_prior_values( field, buffer_frame, digest_frame, self.last_known_prior_values, ) + + if digest_frame is not None: + return_frame.ix[:-1] = digest_frame.ix[:] + if field == 'volume': - # This works for the day rollup, i.e. '1d', - # but '1m' will need to allow for 0 or nan minutes - return_frame.ix[algo_dt] = buffer_frame.sum() + return_frame.ix[algo_dt] = buffer_frame.fillna(0).sum() elif field == 'high': return_frame.ix[algo_dt] = buffer_frame.max() elif field == 'low': return_frame.ix[algo_dt] = buffer_frame.min() elif field == 'open_price': - return_frame.ix[algo_dt] = buffer_frame.ix[0] + return_frame.ix[algo_dt] = buffer_frame.iloc[0] else: - return_frame.ix[algo_dt] = buffer_frame.ix[algo_dt] + return_frame.ix[algo_dt] = buffer_frame.loc[algo_dt] return return_frame From cfe00b0c371c46810942ac544766cab6ee539ab6 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Mon, 2 Jun 2014 23:43:13 -0400 Subject: [PATCH 5/6] ENH/TEST: Enable '1m' history and add tests for the frequency. --- tests/history_cases.py | 578 +++++++++++++++++++++++++++++++++++++ tests/test_history.py | 94 +++++- zipline/history/history.py | 2 +- zipline/protocol.py | 7 +- 4 files changed, 668 insertions(+), 13 deletions(-) create mode 100644 tests/history_cases.py diff --git a/tests/history_cases.py b/tests/history_cases.py new file mode 100644 index 00000000..2ba6e0a2 --- /dev/null +++ b/tests/history_cases.py @@ -0,0 +1,578 @@ +""" +Test case definitions for history tests. +""" + +import pandas as pd +import numpy as np + +from zipline.finance.trading import TradingEnvironment +from zipline.history.history import HistorySpec +from zipline.protocol import BarData + + +def to_utc(time_str): + return pd.Timestamp(time_str, tz='US/Eastern').tz_convert('UTC') + + +def mixed_frequency_expected_index(count, frequency): + """ + Helper for enumerating expected indices for test_mixed_frequency. + """ + env = TradingEnvironment.instance() + minute = MIXED_FREQUENCY_MINUTES[count] + + if frequency == '1d': + return [env.previous_open_and_close(minute)[1], minute] + elif frequency == '1m': + return [env.previous_market_minute(minute), minute] + + +def mixed_frequency_expected_data(count, frequency): + """ + Helper for enumerating expected data test_mixed_frequency. + """ + if frequency == '1d': + if count < 390: + return [np.nan, count] + else: + return [389, count] + elif frequency == '1m': + if count == 0: + return [np.nan, count] + else: + return [count - 1, count] + + +MIXED_FREQUENCY_MINUTES = TradingEnvironment.instance().market_minute_window( + to_utc('2013-06-28 9:31AM'), 780, +) +DAILY_OPEN_CLOSE_SPECS = [ + HistorySpec(3, '1d', 'open_price', False), + HistorySpec(3, '1d', 'close_price', False), +] +ILLIQUID_PRICES_SPECS = [ + HistorySpec(3, '1m', 'price', False), + HistorySpec(5, '1m', 'price', True), +] +MIXED_FREQUENCY_SPECS = [ + HistorySpec(2, '1m', 'price', False), + HistorySpec(2, '1d', 'price', False), +] +MIXED_FIELDS_SPECS = [ + HistorySpec(3, '1m', 'price', True), + HistorySpec(3, '1m', 'open_price', True), + HistorySpec(3, '1m', 'close_price', True), + HistorySpec(3, '1m', 'high', True), + HistorySpec(3, '1m', 'low', True), + HistorySpec(3, '1m', 'volume', True), +] + + +HISTORY_CONTAINER_TEST_CASES = { + # June 2013 + # Su Mo Tu We Th Fr Sa + # 1 + # 2 3 4 5 6 7 8 + # 9 10 11 12 13 14 15 + # 16 17 18 19 20 21 22 + # 23 24 25 26 27 28 29 + # 30 + + 'test daily open close': { + # A list of HistorySpec objects. + 'specs': DAILY_OPEN_CLOSE_SPECS, + + # Sids for the test. + 'sids': [1], + + # Start date for test. + 'dt': to_utc('2013-06-21 9:31AM'), + + # Sequence of updates to the container + 'updates': [ + BarData( + { + 1: { + 'open_price': 10, + 'close_price': 11, + 'dt': to_utc('2013-06-21 10:00AM'), + }, + }, + ), + BarData( + { + 1: { + 'open_price': 12, + 'close_price': 13, + 'dt': to_utc('2013-06-21 3:30PM'), + }, + }, + ), + BarData( + { + 1: { + 'open_price': 14, + 'close_price': 15, + # Wait a full market day before the next bar. + # We should end up with nans for Monday the 24th. + 'dt': to_utc('2013-06-25 9:31AM'), + }, + }, + ), + ], + + # Dictionary mapping spec_key -> list of expected outputs + 'expected': { + # open + DAILY_OPEN_CLOSE_SPECS[0].key_str: [ + pd.DataFrame( + data={ + 1: [np.nan, np.nan, 10] + }, + index=[ + to_utc('2013-06-19 4:00PM'), + to_utc('2013-06-20 4:00PM'), + to_utc('2013-06-21 10:00AM'), + ], + ), + + pd.DataFrame( + data={ + 1: [np.nan, np.nan, 10] + }, + index=[ + to_utc('2013-06-19 4:00PM'), + to_utc('2013-06-20 4:00PM'), + to_utc('2013-06-21 3:30PM'), + ], + ), + + pd.DataFrame( + data={ + 1: [10, np.nan, 14] + }, + index=[ + to_utc('2013-06-21 4:00PM'), + to_utc('2013-06-24 4:00PM'), + to_utc('2013-06-25 9:31AM'), + ], + ), + ], + # close + DAILY_OPEN_CLOSE_SPECS[1].key_str: [ + pd.DataFrame( + data={ + 1: [np.nan, np.nan, 11] + }, + index=[ + to_utc('2013-06-19 4:00PM'), + to_utc('2013-06-20 4:00PM'), + to_utc('2013-06-21 10:00AM'), + ], + ), + + pd.DataFrame( + data={ + 1: [np.nan, np.nan, 13] + }, + index=[ + to_utc('2013-06-19 4:00PM'), + to_utc('2013-06-20 4:00PM'), + to_utc('2013-06-21 3:30PM'), + ], + ), + + pd.DataFrame( + data={ + 1: [13, np.nan, 15] + }, + index=[ + to_utc('2013-06-21 4:00PM'), + to_utc('2013-06-24 4:00PM'), + to_utc('2013-06-25 9:31AM'), + ], + ), + ], + }, + }, + + 'test illiquid prices': { + + # A list of HistorySpec objects. + 'specs': ILLIQUID_PRICES_SPECS, + + # Sids for the test. + 'sids': [1], + + # Start date for test. + 'dt': to_utc('2013-06-28 9:31AM'), + + # Sequence of updates to the container + 'updates': [ + BarData( + { + 1: { + 'price': 10, + 'dt': to_utc('2013-06-28 9:31AM'), + }, + }, + ), + BarData( + { + 1: { + 'price': 11, + 'dt': to_utc('2013-06-28 9:32AM'), + }, + }, + ), + BarData( + { + 1: { + 'price': 12, + 'dt': to_utc('2013-06-28 9:33AM'), + }, + }, + ), + BarData( + { + 1: { + 'price': 13, + # Note: Skipping 9:34 to simulate illiquid bar/missing + # data. + 'dt': to_utc('2013-06-28 9:35AM'), + }, + }, + ), + ], + + # Dictionary mapping spec_key -> list of expected outputs + 'expected': { + ILLIQUID_PRICES_SPECS[0].key_str: [ + pd.DataFrame( + data={ + 1: [np.nan, np.nan, 10], + }, + index=[ + to_utc('2013-06-27 3:59PM'), + to_utc('2013-06-27 4:00PM'), + to_utc('2013-06-28 9:31AM'), + ], + ), + + pd.DataFrame( + data={ + 1: [np.nan, 10, 11], + }, + index=[ + to_utc('2013-06-27 4:00PM'), + to_utc('2013-06-28 9:31AM'), + to_utc('2013-06-28 9:32AM'), + ], + ), + + pd.DataFrame( + data={ + 1: [10, 11, 12], + }, + index=[ + to_utc('2013-06-28 9:31AM'), + to_utc('2013-06-28 9:32AM'), + to_utc('2013-06-28 9:33AM'), + ], + ), + + # Since there's no update for 9:34, this is called at 9:35. + pd.DataFrame( + data={ + 1: [12, np.nan, 13], + }, + index=[ + to_utc('2013-06-28 9:33AM'), + to_utc('2013-06-28 9:34AM'), + to_utc('2013-06-28 9:35AM'), + ], + ), + ], + + ILLIQUID_PRICES_SPECS[1].key_str: [ + pd.DataFrame( + data={ + 1: [np.nan, np.nan, np.nan, np.nan, 10], + }, + index=[ + to_utc('2013-06-27 3:57PM'), + to_utc('2013-06-27 3:58PM'), + to_utc('2013-06-27 3:59PM'), + to_utc('2013-06-27 4:00PM'), + to_utc('2013-06-28 9:31AM'), + ], + ), + + pd.DataFrame( + data={ + 1: [np.nan, np.nan, np.nan, 10, 11], + }, + index=[ + to_utc('2013-06-27 3:58PM'), + to_utc('2013-06-27 3:59PM'), + to_utc('2013-06-27 4:00PM'), + to_utc('2013-06-28 9:31AM'), + to_utc('2013-06-28 9:32AM'), + ], + ), + + pd.DataFrame( + data={ + 1: [np.nan, np.nan, 10, 11, 12], + }, + index=[ + to_utc('2013-06-27 3:59PM'), + to_utc('2013-06-27 4:00PM'), + to_utc('2013-06-28 9:31AM'), + to_utc('2013-06-28 9:32AM'), + to_utc('2013-06-28 9:33AM'), + ], + ), + + # Since there's no update for 9:34, this is called at 9:35. + # The 12 value from 9:33 should be forward-filled. + pd.DataFrame( + data={ + 1: [10, 11, 12, 12, 13], + }, + index=[ + to_utc('2013-06-28 9:31AM'), + to_utc('2013-06-28 9:32AM'), + to_utc('2013-06-28 9:33AM'), + to_utc('2013-06-28 9:34AM'), + to_utc('2013-06-28 9:35AM'), + ], + ), + ], + }, + }, + + 'test mixed frequencies': { + + # A list of HistorySpec objects. + 'specs': MIXED_FREQUENCY_SPECS, + + # Sids for the test. + 'sids': [1], + + # Start date for test. + 'dt': to_utc('2013-06-28 9:31AM'), + + # Sequence of updates to the container + 'updates': [ + BarData( + { + 1: { + 'price': count, + 'dt': dt, + } + } + ) + for count, dt in enumerate(MIXED_FREQUENCY_MINUTES) + ], + + # Dictionary mapping spec_key -> list of expected outputs. + 'expected': { + MIXED_FREQUENCY_SPECS[0].key_str: [ + pd.DataFrame( + data={ + 1: mixed_frequency_expected_data(count, '1m'), + }, + index=mixed_frequency_expected_index(count, '1m'), + ) + for count in range(len(MIXED_FREQUENCY_MINUTES)) + ], + + MIXED_FREQUENCY_SPECS[1].key_str: [ + pd.DataFrame( + data={ + 1: mixed_frequency_expected_data(count, '1d'), + }, + index=mixed_frequency_expected_index(count, '1d'), + ) + for count in range(len(MIXED_FREQUENCY_MINUTES)) + ] + }, + }, + + 'test multiple fields and sids': { + + # A list of HistorySpec objects. + 'specs': MIXED_FIELDS_SPECS, + + # Sids for the test. + 'sids': [1, 10], + + # Start date for test. + 'dt': to_utc('2013-06-28 9:31AM'), + + # Sequence of updates to the container + 'updates': [ + BarData( + { + 1: { + 'dt': dt, + 'price': count, + 'open_price': count, + 'close_price': count, + 'high': count, + 'low': count, + 'volume': count, + }, + 10: { + 'dt': dt, + 'price': count * 10, + 'open_price': count * 10, + 'close_price': count * 10, + 'high': count * 10, + 'low': count * 10, + 'volume': count * 10, + }, + }, + ) + for count, dt in enumerate([ + to_utc('2013-06-28 9:31AM'), + to_utc('2013-06-28 9:32AM'), + to_utc('2013-06-28 9:33AM'), + # NOTE: No update for 9:34 + to_utc('2013-06-28 9:35AM'), + ]) + ], + + # Dictionary mapping spec_key -> list of expected outputs + 'expected': dict( + + # Build a dict from a list of tuples. Doing it this way because + # there are two distinct cases we want to test: forward-fillable + # fields and non-forward-fillable fields. + [ + ( + # Non forward-fill fields + key, + [ + pd.DataFrame( + data={ + 1: [np.nan, np.nan, 0], + 10: [np.nan, np.nan, 0], + }, + index=[ + to_utc('2013-06-27 3:59PM'), + to_utc('2013-06-27 4:00PM'), + to_utc('2013-06-28 9:31AM'), + ], + + # Missing volume data should manifest as 0's rather + # than nans. + ).fillna(0 if 'volume' in key else np.nan), + pd.DataFrame( + data={ + 1: [np.nan, 0, 1], + 10: [np.nan, 0, 10], + }, + index=[ + to_utc('2013-06-27 4:00PM'), + to_utc('2013-06-28 9:31AM'), + to_utc('2013-06-28 9:32AM'), + ], + ).fillna(0 if 'volume' in key else np.nan), + + pd.DataFrame( + data={ + 1: [0, 1, 2], + 10: [0, 10, 20], + }, + index=[ + to_utc('2013-06-28 9:31AM'), + to_utc('2013-06-28 9:32AM'), + to_utc('2013-06-28 9:33AM'), + ], + + # Note: Calling fillna() here even though there are + # no NaNs because this makes it less likely + # for us to introduce a stupid bug by + # copy/pasting in the future. + ).fillna(0 if 'volume' in key else np.nan), + pd.DataFrame( + data={ + 1: [2, np.nan, 3], + 10: [20, np.nan, 30], + }, + index=[ + to_utc('2013-06-28 9:33AM'), + to_utc('2013-06-28 9:34AM'), + to_utc('2013-06-28 9:35AM'), + ], + ).fillna(0 if 'volume' in key else np.nan), + ], + ) + for key in [spec.key_str for spec in MIXED_FIELDS_SPECS + if spec.field not in HistorySpec.FORWARD_FILLABLE] + ] + + + # Concatenate the expected results for non-ffillable with + # expected result for ffillable. + [ + ( + # Forward-fillable fields + key, + [ + pd.DataFrame( + data={ + 1: [np.nan, np.nan, 0], + 10: [np.nan, np.nan, 0], + }, + index=[ + to_utc('2013-06-27 3:59PM'), + to_utc('2013-06-27 4:00PM'), + to_utc('2013-06-28 9:31AM'), + ], + ), + + pd.DataFrame( + data={ + 1: [np.nan, 0, 1], + 10: [np.nan, 0, 10], + }, + index=[ + to_utc('2013-06-27 4:00PM'), + to_utc('2013-06-28 9:31AM'), + to_utc('2013-06-28 9:32AM'), + ], + ), + + pd.DataFrame( + data={ + 1: [0, 1, 2], + 10: [0, 10, 20], + }, + index=[ + to_utc('2013-06-28 9:31AM'), + to_utc('2013-06-28 9:32AM'), + to_utc('2013-06-28 9:33AM'), + ], + ), + + pd.DataFrame( + data={ + 1: [2, 2, 3], + 10: [20, 20, 30], + }, + index=[ + to_utc('2013-06-28 9:33AM'), + to_utc('2013-06-28 9:34AM'), + to_utc('2013-06-28 9:35AM'), + ], + ), + ], + ) + for key in [spec.key_str for spec in MIXED_FIELDS_SPECS + if spec.field in HistorySpec.FORWARD_FILLABLE] + ] + ), + }, +} diff --git a/tests/test_history.py b/tests/test_history.py index 3ca7e10b..dc8fc7e1 100644 --- a/tests/test_history.py +++ b/tests/test_history.py @@ -24,10 +24,14 @@ from zipline.history.history_container import HistoryContainer from zipline.protocol import BarData import zipline.utils.factory as factory from zipline import TradingAlgorithm -from zipline.finance.trading import SimulationParameters +from zipline.finance.trading import SimulationParameters, TradingEnvironment from zipline.sources import RandomWalkSource +from .history_cases import ( + HISTORY_CONTAINER_TEST_CASES, +) + # Cases are over the July 4th holiday, to ensure use of trading calendar. # March 2013 @@ -73,7 +77,7 @@ from zipline.sources import RandomWalkSource # Times to be converted via: # pd.Timestamp('2013-07-05 9:31', tz='US/Eastern').tz_convert('UTC')}, -MINUTE_CASES_RAW = { +INDEX_TEST_CASES_RAW = { 'week of daily data': { 'input': {'bar_count': 5, 'frequency': '1d', @@ -86,6 +90,18 @@ MINUTE_CASES_RAW = { '2013-07-05 9:31AM', ] }, + 'five minutes on july 5th open': { + 'input': {'bar_count': 5, + 'frequency': '1m', + 'algo_dt': '2013-07-05 9:31AM'}, + 'expected': [ + '2013-07-03 12:57PM', + '2013-07-03 12:58PM', + '2013-07-03 12:59PM', + '2013-07-03 1:00PM', + '2013-07-05 9:31AM', + ] + }, } @@ -104,28 +120,31 @@ def convert_cases(cases): in case['expected']]) return cases -MINUTE_CASES = convert_cases(MINUTE_CASES_RAW) +INDEX_TEST_CASES = convert_cases(INDEX_TEST_CASES_RAW) -def index_at_dt(case_input): +def get_index_at_dt(case_input): history_spec = history.HistorySpec( case_input['bar_count'], case_input['frequency'], None, False ) - return history.index_at_dt(history_spec, - case_input['algo_dt']) + return history.index_at_dt(history_spec, case_input['algo_dt']) class TestHistoryIndex(TestCase): + @classmethod + def setUpClass(cls): + cls.environment = TradingEnvironment.instance() + @parameterized.expand( [(name, case['input'], case['expected']) - for name, case in MINUTE_CASES.items()] + for name, case in INDEX_TEST_CASES.items()] ) def test_index_at_dt(self, name, case_input, expected): - history_index = index_at_dt(case_input) + history_index = get_index_at_dt(case_input) history_series = pd.Series(index=history_index) expected_series = pd.Series(index=expected) @@ -135,9 +154,64 @@ class TestHistoryIndex(TestCase): class TestHistoryContainer(TestCase): + @classmethod + def setUpClass(cls): + cls.env = TradingEnvironment.instance() + + def bar_data_dt(self, bar_data, require_unique=True): + """ + Get a dt to associate with the given BarData object. + + If require_unique == True, throw an error if multiple unique dt's are + encountered. Otherwise, return the earliest dt encountered. + """ + dts = {sid_data['dt'] for sid_data in bar_data.values()} + if require_unique and len(dts) > 1: + self.fail("Multiple unique dts ({0}) in {1}".format(dts, bar_data)) + + return sorted(dts)[0] + + @parameterized.expand( + [(name, + case['specs'], + case['sids'], + case['dt'], + case['updates'], + case['expected']) + for name, case in HISTORY_CONTAINER_TEST_CASES.items()] + ) + def test_history_container(self, + name, + specs, + sids, + dt, + updates, + expected): + + for spec in specs: + # Sanity check on test input. + self.assertEqual(len(expected[spec.key_str]), len(updates)) + + container = HistoryContainer( + {spec.key_str: spec for spec in specs}, sids, dt + ) + + for update_count, update in enumerate(updates): + + bar_dt = self.bar_data_dt(update) + container.update(update, bar_dt) + + for spec in specs: + pd.util.testing.assert_frame_equal( + container.get_history(spec, bar_dt), + expected[spec.key_str][update_count], + check_dtype=False, + check_column_type=True, + check_index_type=True, + check_frame_type=True, + ) + def test_container_nans_and_daily_roll(self): - # set up trading environment - factory.create_simulation_parameters(num_days=4) spec = history.HistorySpec( bar_count=3, diff --git a/zipline/history/history.py b/zipline/history/history.py index 2adbd103..e5b147b3 100644 --- a/zipline/history/history.py +++ b/zipline/history/history.py @@ -38,7 +38,7 @@ class Frequency(object): the elapsed minutes of the (incomplete) current day - "1m" provides data at minute frequency """ - SUPPORTED_FREQUENCIES = frozenset({'1d'}) + SUPPORTED_FREQUENCIES = frozenset({'1d', '1m'}) MAX_MINUTES = {'m': 1, 'd': 390} def __init__(self, freq_str): diff --git a/zipline/protocol.py b/zipline/protocol.py index e3983de8..7d78119d 100644 --- a/zipline/protocol.py +++ b/zipline/protocol.py @@ -157,8 +157,8 @@ class BarData(object): usage of what this replaced as a dictionary subclass. """ - def __init__(self): - self._data = {} + def __init__(self, data=None): + self._data = data or {} self._contains_override = None def __contains__(self, name): @@ -217,3 +217,6 @@ class BarData(object): def __len__(self): return len(self.keys()) + + def __repr__(self): + return '{0}({1})'.format(self.__class__.__name__, self._data) From fba649dd7aafcf394a433ad1001186b6f51e28e2 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Thu, 5 Jun 2014 13:55:21 -0400 Subject: [PATCH 6/6] TST: Changed test_mixed_frequencies to use a half day. Also adds a length-1 HistorySpec to the test. --- tests/history_cases.py | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/tests/history_cases.py b/tests/history_cases.py index 2ba6e0a2..5f8f3dd2 100644 --- a/tests/history_cases.py +++ b/tests/history_cases.py @@ -32,10 +32,11 @@ def mixed_frequency_expected_data(count, frequency): Helper for enumerating expected data test_mixed_frequency. """ if frequency == '1d': - if count < 390: + # First day of this test is July 3rd, which is a half day. + if count < 210: return [np.nan, count] else: - return [389, count] + return [209, count] elif frequency == '1m': if count == 0: return [np.nan, count] @@ -44,7 +45,7 @@ def mixed_frequency_expected_data(count, frequency): MIXED_FREQUENCY_MINUTES = TradingEnvironment.instance().market_minute_window( - to_utc('2013-06-28 9:31AM'), 780, + to_utc('2013-07-03 9:31AM'), 600, ) DAILY_OPEN_CLOSE_SPECS = [ HistorySpec(3, '1d', 'open_price', False), @@ -55,6 +56,7 @@ ILLIQUID_PRICES_SPECS = [ HistorySpec(5, '1m', 'price', True), ] MIXED_FREQUENCY_SPECS = [ + HistorySpec(1, '1m', 'price', False), HistorySpec(2, '1m', 'price', False), HistorySpec(2, '1d', 'price', False), ] @@ -361,7 +363,14 @@ HISTORY_CONTAINER_TEST_CASES = { 'sids': [1], # Start date for test. - 'dt': to_utc('2013-06-28 9:31AM'), + # July 2013 + # Su Mo Tu We Th Fr Sa + # 1 2 3 4 5 6 + # 7 8 9 10 11 12 13 + # 14 15 16 17 18 19 20 + # 21 22 23 24 25 26 27 + # 28 29 30 31 + 'dt': to_utc('2013-07-03 9:31AM'), # Sequence of updates to the container 'updates': [ @@ -378,7 +387,18 @@ HISTORY_CONTAINER_TEST_CASES = { # Dictionary mapping spec_key -> list of expected outputs. 'expected': { + MIXED_FREQUENCY_SPECS[0].key_str: [ + pd.DataFrame( + data={ + 1: [count], + }, + index=[minute], + ) + for count, minute in enumerate(MIXED_FREQUENCY_MINUTES) + ], + + MIXED_FREQUENCY_SPECS[1].key_str: [ pd.DataFrame( data={ 1: mixed_frequency_expected_data(count, '1m'), @@ -388,7 +408,7 @@ HISTORY_CONTAINER_TEST_CASES = { for count in range(len(MIXED_FREQUENCY_MINUTES)) ], - MIXED_FREQUENCY_SPECS[1].key_str: [ + MIXED_FREQUENCY_SPECS[2].key_str: [ pd.DataFrame( data={ 1: mixed_frequency_expected_data(count, '1d'),