diff --git a/tests/history_cases.py b/tests/history_cases.py new file mode 100644 index 00000000..5f8f3dd2 --- /dev/null +++ b/tests/history_cases.py @@ -0,0 +1,598 @@ +""" +Test case definitions for history tests. +""" + +import pandas as pd +import numpy as np + +from zipline.finance.trading import TradingEnvironment +from zipline.history.history import HistorySpec +from zipline.protocol import BarData + + +def to_utc(time_str): + return pd.Timestamp(time_str, tz='US/Eastern').tz_convert('UTC') + + +def mixed_frequency_expected_index(count, frequency): + """ + Helper for enumerating expected indices for test_mixed_frequency. + """ + env = TradingEnvironment.instance() + minute = MIXED_FREQUENCY_MINUTES[count] + + if frequency == '1d': + return [env.previous_open_and_close(minute)[1], minute] + elif frequency == '1m': + return [env.previous_market_minute(minute), minute] + + +def mixed_frequency_expected_data(count, frequency): + """ + Helper for enumerating expected data test_mixed_frequency. + """ + if frequency == '1d': + # First day of this test is July 3rd, which is a half day. + if count < 210: + return [np.nan, count] + else: + return [209, count] + elif frequency == '1m': + if count == 0: + return [np.nan, count] + else: + return [count - 1, count] + + +MIXED_FREQUENCY_MINUTES = TradingEnvironment.instance().market_minute_window( + to_utc('2013-07-03 9:31AM'), 600, +) +DAILY_OPEN_CLOSE_SPECS = [ + HistorySpec(3, '1d', 'open_price', False), + HistorySpec(3, '1d', 'close_price', False), +] +ILLIQUID_PRICES_SPECS = [ + HistorySpec(3, '1m', 'price', False), + HistorySpec(5, '1m', 'price', True), +] +MIXED_FREQUENCY_SPECS = [ + HistorySpec(1, '1m', 'price', False), + HistorySpec(2, '1m', 'price', False), + HistorySpec(2, '1d', 'price', False), +] +MIXED_FIELDS_SPECS = [ + HistorySpec(3, '1m', 'price', True), + HistorySpec(3, '1m', 'open_price', True), + HistorySpec(3, '1m', 'close_price', True), + HistorySpec(3, '1m', 'high', True), + HistorySpec(3, '1m', 'low', True), + HistorySpec(3, '1m', 'volume', True), +] + + +HISTORY_CONTAINER_TEST_CASES = { + # June 2013 + # Su Mo Tu We Th Fr Sa + # 1 + # 2 3 4 5 6 7 8 + # 9 10 11 12 13 14 15 + # 16 17 18 19 20 21 22 + # 23 24 25 26 27 28 29 + # 30 + + 'test daily open close': { + # A list of HistorySpec objects. + 'specs': DAILY_OPEN_CLOSE_SPECS, + + # Sids for the test. + 'sids': [1], + + # Start date for test. + 'dt': to_utc('2013-06-21 9:31AM'), + + # Sequence of updates to the container + 'updates': [ + BarData( + { + 1: { + 'open_price': 10, + 'close_price': 11, + 'dt': to_utc('2013-06-21 10:00AM'), + }, + }, + ), + BarData( + { + 1: { + 'open_price': 12, + 'close_price': 13, + 'dt': to_utc('2013-06-21 3:30PM'), + }, + }, + ), + BarData( + { + 1: { + 'open_price': 14, + 'close_price': 15, + # Wait a full market day before the next bar. + # We should end up with nans for Monday the 24th. + 'dt': to_utc('2013-06-25 9:31AM'), + }, + }, + ), + ], + + # Dictionary mapping spec_key -> list of expected outputs + 'expected': { + # open + DAILY_OPEN_CLOSE_SPECS[0].key_str: [ + pd.DataFrame( + data={ + 1: [np.nan, np.nan, 10] + }, + index=[ + to_utc('2013-06-19 4:00PM'), + to_utc('2013-06-20 4:00PM'), + to_utc('2013-06-21 10:00AM'), + ], + ), + + pd.DataFrame( + data={ + 1: [np.nan, np.nan, 10] + }, + index=[ + to_utc('2013-06-19 4:00PM'), + to_utc('2013-06-20 4:00PM'), + to_utc('2013-06-21 3:30PM'), + ], + ), + + pd.DataFrame( + data={ + 1: [10, np.nan, 14] + }, + index=[ + to_utc('2013-06-21 4:00PM'), + to_utc('2013-06-24 4:00PM'), + to_utc('2013-06-25 9:31AM'), + ], + ), + ], + # close + DAILY_OPEN_CLOSE_SPECS[1].key_str: [ + pd.DataFrame( + data={ + 1: [np.nan, np.nan, 11] + }, + index=[ + to_utc('2013-06-19 4:00PM'), + to_utc('2013-06-20 4:00PM'), + to_utc('2013-06-21 10:00AM'), + ], + ), + + pd.DataFrame( + data={ + 1: [np.nan, np.nan, 13] + }, + index=[ + to_utc('2013-06-19 4:00PM'), + to_utc('2013-06-20 4:00PM'), + to_utc('2013-06-21 3:30PM'), + ], + ), + + pd.DataFrame( + data={ + 1: [13, np.nan, 15] + }, + index=[ + to_utc('2013-06-21 4:00PM'), + to_utc('2013-06-24 4:00PM'), + to_utc('2013-06-25 9:31AM'), + ], + ), + ], + }, + }, + + 'test illiquid prices': { + + # A list of HistorySpec objects. + 'specs': ILLIQUID_PRICES_SPECS, + + # Sids for the test. + 'sids': [1], + + # Start date for test. + 'dt': to_utc('2013-06-28 9:31AM'), + + # Sequence of updates to the container + 'updates': [ + BarData( + { + 1: { + 'price': 10, + 'dt': to_utc('2013-06-28 9:31AM'), + }, + }, + ), + BarData( + { + 1: { + 'price': 11, + 'dt': to_utc('2013-06-28 9:32AM'), + }, + }, + ), + BarData( + { + 1: { + 'price': 12, + 'dt': to_utc('2013-06-28 9:33AM'), + }, + }, + ), + BarData( + { + 1: { + 'price': 13, + # Note: Skipping 9:34 to simulate illiquid bar/missing + # data. + 'dt': to_utc('2013-06-28 9:35AM'), + }, + }, + ), + ], + + # Dictionary mapping spec_key -> list of expected outputs + 'expected': { + ILLIQUID_PRICES_SPECS[0].key_str: [ + pd.DataFrame( + data={ + 1: [np.nan, np.nan, 10], + }, + index=[ + to_utc('2013-06-27 3:59PM'), + to_utc('2013-06-27 4:00PM'), + to_utc('2013-06-28 9:31AM'), + ], + ), + + pd.DataFrame( + data={ + 1: [np.nan, 10, 11], + }, + index=[ + to_utc('2013-06-27 4:00PM'), + to_utc('2013-06-28 9:31AM'), + to_utc('2013-06-28 9:32AM'), + ], + ), + + pd.DataFrame( + data={ + 1: [10, 11, 12], + }, + index=[ + to_utc('2013-06-28 9:31AM'), + to_utc('2013-06-28 9:32AM'), + to_utc('2013-06-28 9:33AM'), + ], + ), + + # Since there's no update for 9:34, this is called at 9:35. + pd.DataFrame( + data={ + 1: [12, np.nan, 13], + }, + index=[ + to_utc('2013-06-28 9:33AM'), + to_utc('2013-06-28 9:34AM'), + to_utc('2013-06-28 9:35AM'), + ], + ), + ], + + ILLIQUID_PRICES_SPECS[1].key_str: [ + pd.DataFrame( + data={ + 1: [np.nan, np.nan, np.nan, np.nan, 10], + }, + index=[ + to_utc('2013-06-27 3:57PM'), + to_utc('2013-06-27 3:58PM'), + to_utc('2013-06-27 3:59PM'), + to_utc('2013-06-27 4:00PM'), + to_utc('2013-06-28 9:31AM'), + ], + ), + + pd.DataFrame( + data={ + 1: [np.nan, np.nan, np.nan, 10, 11], + }, + index=[ + to_utc('2013-06-27 3:58PM'), + to_utc('2013-06-27 3:59PM'), + to_utc('2013-06-27 4:00PM'), + to_utc('2013-06-28 9:31AM'), + to_utc('2013-06-28 9:32AM'), + ], + ), + + pd.DataFrame( + data={ + 1: [np.nan, np.nan, 10, 11, 12], + }, + index=[ + to_utc('2013-06-27 3:59PM'), + to_utc('2013-06-27 4:00PM'), + to_utc('2013-06-28 9:31AM'), + to_utc('2013-06-28 9:32AM'), + to_utc('2013-06-28 9:33AM'), + ], + ), + + # Since there's no update for 9:34, this is called at 9:35. + # The 12 value from 9:33 should be forward-filled. + pd.DataFrame( + data={ + 1: [10, 11, 12, 12, 13], + }, + index=[ + to_utc('2013-06-28 9:31AM'), + to_utc('2013-06-28 9:32AM'), + to_utc('2013-06-28 9:33AM'), + to_utc('2013-06-28 9:34AM'), + to_utc('2013-06-28 9:35AM'), + ], + ), + ], + }, + }, + + 'test mixed frequencies': { + + # A list of HistorySpec objects. + 'specs': MIXED_FREQUENCY_SPECS, + + # Sids for the test. + 'sids': [1], + + # Start date for test. + # July 2013 + # Su Mo Tu We Th Fr Sa + # 1 2 3 4 5 6 + # 7 8 9 10 11 12 13 + # 14 15 16 17 18 19 20 + # 21 22 23 24 25 26 27 + # 28 29 30 31 + 'dt': to_utc('2013-07-03 9:31AM'), + + # Sequence of updates to the container + 'updates': [ + BarData( + { + 1: { + 'price': count, + 'dt': dt, + } + } + ) + for count, dt in enumerate(MIXED_FREQUENCY_MINUTES) + ], + + # Dictionary mapping spec_key -> list of expected outputs. + 'expected': { + + MIXED_FREQUENCY_SPECS[0].key_str: [ + pd.DataFrame( + data={ + 1: [count], + }, + index=[minute], + ) + for count, minute in enumerate(MIXED_FREQUENCY_MINUTES) + ], + + MIXED_FREQUENCY_SPECS[1].key_str: [ + pd.DataFrame( + data={ + 1: mixed_frequency_expected_data(count, '1m'), + }, + index=mixed_frequency_expected_index(count, '1m'), + ) + for count in range(len(MIXED_FREQUENCY_MINUTES)) + ], + + MIXED_FREQUENCY_SPECS[2].key_str: [ + pd.DataFrame( + data={ + 1: mixed_frequency_expected_data(count, '1d'), + }, + index=mixed_frequency_expected_index(count, '1d'), + ) + for count in range(len(MIXED_FREQUENCY_MINUTES)) + ] + }, + }, + + 'test multiple fields and sids': { + + # A list of HistorySpec objects. + 'specs': MIXED_FIELDS_SPECS, + + # Sids for the test. + 'sids': [1, 10], + + # Start date for test. + 'dt': to_utc('2013-06-28 9:31AM'), + + # Sequence of updates to the container + 'updates': [ + BarData( + { + 1: { + 'dt': dt, + 'price': count, + 'open_price': count, + 'close_price': count, + 'high': count, + 'low': count, + 'volume': count, + }, + 10: { + 'dt': dt, + 'price': count * 10, + 'open_price': count * 10, + 'close_price': count * 10, + 'high': count * 10, + 'low': count * 10, + 'volume': count * 10, + }, + }, + ) + for count, dt in enumerate([ + to_utc('2013-06-28 9:31AM'), + to_utc('2013-06-28 9:32AM'), + to_utc('2013-06-28 9:33AM'), + # NOTE: No update for 9:34 + to_utc('2013-06-28 9:35AM'), + ]) + ], + + # Dictionary mapping spec_key -> list of expected outputs + 'expected': dict( + + # Build a dict from a list of tuples. Doing it this way because + # there are two distinct cases we want to test: forward-fillable + # fields and non-forward-fillable fields. + [ + ( + # Non forward-fill fields + key, + [ + pd.DataFrame( + data={ + 1: [np.nan, np.nan, 0], + 10: [np.nan, np.nan, 0], + }, + index=[ + to_utc('2013-06-27 3:59PM'), + to_utc('2013-06-27 4:00PM'), + to_utc('2013-06-28 9:31AM'), + ], + + # Missing volume data should manifest as 0's rather + # than nans. + ).fillna(0 if 'volume' in key else np.nan), + pd.DataFrame( + data={ + 1: [np.nan, 0, 1], + 10: [np.nan, 0, 10], + }, + index=[ + to_utc('2013-06-27 4:00PM'), + to_utc('2013-06-28 9:31AM'), + to_utc('2013-06-28 9:32AM'), + ], + ).fillna(0 if 'volume' in key else np.nan), + + pd.DataFrame( + data={ + 1: [0, 1, 2], + 10: [0, 10, 20], + }, + index=[ + to_utc('2013-06-28 9:31AM'), + to_utc('2013-06-28 9:32AM'), + to_utc('2013-06-28 9:33AM'), + ], + + # Note: Calling fillna() here even though there are + # no NaNs because this makes it less likely + # for us to introduce a stupid bug by + # copy/pasting in the future. + ).fillna(0 if 'volume' in key else np.nan), + pd.DataFrame( + data={ + 1: [2, np.nan, 3], + 10: [20, np.nan, 30], + }, + index=[ + to_utc('2013-06-28 9:33AM'), + to_utc('2013-06-28 9:34AM'), + to_utc('2013-06-28 9:35AM'), + ], + ).fillna(0 if 'volume' in key else np.nan), + ], + ) + for key in [spec.key_str for spec in MIXED_FIELDS_SPECS + if spec.field not in HistorySpec.FORWARD_FILLABLE] + ] + + + # Concatenate the expected results for non-ffillable with + # expected result for ffillable. + [ + ( + # Forward-fillable fields + key, + [ + pd.DataFrame( + data={ + 1: [np.nan, np.nan, 0], + 10: [np.nan, np.nan, 0], + }, + index=[ + to_utc('2013-06-27 3:59PM'), + to_utc('2013-06-27 4:00PM'), + to_utc('2013-06-28 9:31AM'), + ], + ), + + pd.DataFrame( + data={ + 1: [np.nan, 0, 1], + 10: [np.nan, 0, 10], + }, + index=[ + to_utc('2013-06-27 4:00PM'), + to_utc('2013-06-28 9:31AM'), + to_utc('2013-06-28 9:32AM'), + ], + ), + + pd.DataFrame( + data={ + 1: [0, 1, 2], + 10: [0, 10, 20], + }, + index=[ + to_utc('2013-06-28 9:31AM'), + to_utc('2013-06-28 9:32AM'), + to_utc('2013-06-28 9:33AM'), + ], + ), + + pd.DataFrame( + data={ + 1: [2, 2, 3], + 10: [20, 20, 30], + }, + index=[ + to_utc('2013-06-28 9:33AM'), + to_utc('2013-06-28 9:34AM'), + to_utc('2013-06-28 9:35AM'), + ], + ), + ], + ) + for key in [spec.key_str for spec in MIXED_FIELDS_SPECS + if spec.field in HistorySpec.FORWARD_FILLABLE] + ] + ), + }, +} diff --git a/tests/test_finance.py b/tests/test_finance.py index 7eec21b8..a9fa724d 100644 --- a/tests/test_finance.py +++ b/tests/test_finance.py @@ -40,6 +40,7 @@ from zipline.finance.blotter import Blotter from zipline.gens.composites import date_sorted_sources from zipline.finance import trading +from zipline.finance.trading import TradingEnvironment from zipline.finance.execution import MarketOrder, LimitOrder from zipline.finance.trading import SimulationParameters @@ -80,88 +81,6 @@ class FinanceTestCase(TestCase): self.assertTrue(trade.dt > prev.dt) prev = trade - @timed(DEFAULT_TIMEOUT) - def test_trading_environment(self): - # holidays taken from: http://www.nyse.com/press/1191407641943.html - new_years = datetime(2008, 1, 1, tzinfo=pytz.utc) - mlk_day = datetime(2008, 1, 21, tzinfo=pytz.utc) - presidents = datetime(2008, 2, 18, tzinfo=pytz.utc) - good_friday = datetime(2008, 3, 21, tzinfo=pytz.utc) - memorial_day = datetime(2008, 5, 26, tzinfo=pytz.utc) - july_4th = datetime(2008, 7, 4, tzinfo=pytz.utc) - labor_day = datetime(2008, 9, 1, tzinfo=pytz.utc) - tgiving = datetime(2008, 11, 27, tzinfo=pytz.utc) - christmas = datetime(2008, 5, 25, tzinfo=pytz.utc) - a_saturday = datetime(2008, 8, 2, tzinfo=pytz.utc) - a_sunday = datetime(2008, 10, 12, tzinfo=pytz.utc) - holidays = [ - new_years, - mlk_day, - presidents, - good_friday, - memorial_day, - july_4th, - labor_day, - tgiving, - christmas, - a_saturday, - a_sunday - ] - - for holiday in holidays: - self.assertTrue(not trading.environment.is_trading_day(holiday)) - - first_trading_day = datetime(2008, 1, 2, tzinfo=pytz.utc) - last_trading_day = datetime(2008, 12, 31, tzinfo=pytz.utc) - workdays = [first_trading_day, last_trading_day] - - for workday in workdays: - self.assertTrue(trading.environment.is_trading_day(workday)) - - def test_simulation_parameters(self): - env = SimulationParameters( - period_start=datetime(2008, 1, 1, tzinfo=pytz.utc), - period_end=datetime(2008, 12, 31, tzinfo=pytz.utc), - capital_base=100000, - ) - - self.assertTrue(env.last_close.month == 12) - self.assertTrue(env.last_close.day == 31) - - @timed(DEFAULT_TIMEOUT) - def test_sim_params_days_in_period(self): - - # January 2008 - # Su Mo Tu We Th Fr Sa - # 1 2 3 4 5 - # 6 7 8 9 10 11 12 - # 13 14 15 16 17 18 19 - # 20 21 22 23 24 25 26 - # 27 28 29 30 31 - - env = SimulationParameters( - period_start=datetime(2007, 12, 31, tzinfo=pytz.utc), - period_end=datetime(2008, 1, 7, tzinfo=pytz.utc), - capital_base=100000, - ) - - expected_trading_days = ( - datetime(2007, 12, 31, tzinfo=pytz.utc), - # Skip new years - # holidays taken from: http://www.nyse.com/press/1191407641943.html - datetime(2008, 1, 2, tzinfo=pytz.utc), - datetime(2008, 1, 3, tzinfo=pytz.utc), - datetime(2008, 1, 4, tzinfo=pytz.utc), - # Skip Saturday - # Skip Sunday - datetime(2008, 1, 7, tzinfo=pytz.utc) - ) - - num_expected_trading_days = 5 - self.assertEquals(num_expected_trading_days, env.days_in_period) - np.testing.assert_array_equal(expected_trading_days, - env.trading_days.tolist()) - @timed(EXTENDED_TIMEOUT) def test_full_zipline(self): # provide enough trades to ensure all orders are filled. @@ -429,3 +348,183 @@ class FinanceTestCase(TestCase): self.assertEqual(300, fls_order['amount']) self.assertEqual(3.33, fls_order['limit']) self.assertEqual(2, fls_order['sid']) + + +class TradingEnvironmentTestCase(TestCase): + """ + Tests for date management utilities in zipline.finance.trading. + """ + + def setUp(self): + setup_logger(self) + + def tearDown(self): + teardown_logger(self) + + @classmethod + def setUpClass(cls): + cls.env = TradingEnvironment() + + @timed(DEFAULT_TIMEOUT) + def test_is_trading_day(self): + # holidays taken from: http://www.nyse.com/press/1191407641943.html + new_years = datetime(2008, 1, 1, tzinfo=pytz.utc) + mlk_day = datetime(2008, 1, 21, tzinfo=pytz.utc) + presidents = datetime(2008, 2, 18, tzinfo=pytz.utc) + good_friday = datetime(2008, 3, 21, tzinfo=pytz.utc) + memorial_day = datetime(2008, 5, 26, tzinfo=pytz.utc) + july_4th = datetime(2008, 7, 4, tzinfo=pytz.utc) + labor_day = datetime(2008, 9, 1, tzinfo=pytz.utc) + tgiving = datetime(2008, 11, 27, tzinfo=pytz.utc) + christmas = datetime(2008, 5, 25, tzinfo=pytz.utc) + a_saturday = datetime(2008, 8, 2, tzinfo=pytz.utc) + a_sunday = datetime(2008, 10, 12, tzinfo=pytz.utc) + holidays = [ + new_years, + mlk_day, + presidents, + good_friday, + memorial_day, + july_4th, + labor_day, + tgiving, + christmas, + a_saturday, + a_sunday + ] + + for holiday in holidays: + self.assertTrue(not self.env.is_trading_day(holiday)) + + first_trading_day = datetime(2008, 1, 2, tzinfo=pytz.utc) + last_trading_day = datetime(2008, 12, 31, tzinfo=pytz.utc) + workdays = [first_trading_day, last_trading_day] + + for workday in workdays: + self.assertTrue(self.env.is_trading_day(workday)) + + def test_simulation_parameters(self): + env = SimulationParameters( + period_start=datetime(2008, 1, 1, tzinfo=pytz.utc), + period_end=datetime(2008, 12, 31, tzinfo=pytz.utc), + capital_base=100000, + ) + + self.assertTrue(env.last_close.month == 12) + self.assertTrue(env.last_close.day == 31) + + @timed(DEFAULT_TIMEOUT) + def test_sim_params_days_in_period(self): + + # January 2008 + # Su Mo Tu We Th Fr Sa + # 1 2 3 4 5 + # 6 7 8 9 10 11 12 + # 13 14 15 16 17 18 19 + # 20 21 22 23 24 25 26 + # 27 28 29 30 31 + + env = SimulationParameters( + period_start=datetime(2007, 12, 31, tzinfo=pytz.utc), + period_end=datetime(2008, 1, 7, tzinfo=pytz.utc), + capital_base=100000, + ) + + expected_trading_days = ( + datetime(2007, 12, 31, tzinfo=pytz.utc), + # Skip new years + # holidays taken from: http://www.nyse.com/press/1191407641943.html + datetime(2008, 1, 2, tzinfo=pytz.utc), + datetime(2008, 1, 3, tzinfo=pytz.utc), + datetime(2008, 1, 4, tzinfo=pytz.utc), + # Skip Saturday + # Skip Sunday + datetime(2008, 1, 7, tzinfo=pytz.utc) + ) + + num_expected_trading_days = 5 + self.assertEquals(num_expected_trading_days, env.days_in_period) + np.testing.assert_array_equal(expected_trading_days, + env.trading_days.tolist()) + + @timed(DEFAULT_TIMEOUT) + def test_market_minute_window(self): + + # January 2008 + # Su Mo Tu We Th Fr Sa + # 1 2 3 4 5 + # 6 7 8 9 10 11 12 + # 13 14 15 16 17 18 19 + # 20 21 22 23 24 25 26 + # 27 28 29 30 31 + + us_east = pytz.timezone('US/Eastern') + utc = pytz.utc + + # 10:01 AM Eastern on January 7th.. + start = us_east.localize(datetime(2008, 1, 7, 10, 1)) + utc_start = start.astimezone(utc) + + # Get the next 10 minutes + minutes = self.env.market_minute_window( + utc_start, 10, + ) + self.assertEqual(len(minutes), 10) + for i in range(10): + self.assertEqual(minutes[i], utc_start + timedelta(minutes=i)) + + # Get the previous 10 minutes. + minutes = self.env.market_minute_window( + utc_start, 10, step=-1, + ) + self.assertEqual(len(minutes), 10) + for i in range(10): + self.assertEqual(minutes[i], utc_start + timedelta(minutes=-i)) + + # Get the next 900 minutes, including utc_start, rolling over into the + # next two days. + # Should include: + # Today: 10:01 AM -> 4:00 PM (360 minutes) + # Tomorrow: 9:31 AM -> 4:00 PM (390 minutes, 750 total) + # Last Day: 9:31 AM -> 12:00 PM (150 minutes, 900 total) + minutes = self.env.market_minute_window( + utc_start, 900, + ) + today = self.env.market_minutes_for_day(start)[30:] + tomorrow = self.env.market_minutes_for_day( + start + timedelta(days=1) + ) + last_day = self.env.market_minutes_for_day( + start + timedelta(days=2))[:150] + + self.assertEqual(len(minutes), 900) + self.assertEqual(minutes[0], utc_start) + self.assertTrue(all(today == minutes[:360])) + self.assertTrue(all(tomorrow == minutes[360:750])) + self.assertTrue(all(last_day == minutes[750:])) + + # Get the previous 801 minutes, including utc_start, rolling over into + # Friday the 4th and Thursday the 3rd. + # Should include: + # Today: 10:01 AM -> 9:31 AM (31 minutes) + # Friday: 4:00 PM -> 9:31 AM (390 minutes, 421 total) + # Thursday: 4:00 PM -> 9:41 AM (380 minutes, 801 total) + minutes = self.env.market_minute_window( + utc_start, 801, step=-1, + ) + + today = self.env.market_minutes_for_day(start)[30::-1] + # minus an extra two days from each of these to account for the two + # weekend days we skipped + friday = self.env.market_minutes_for_day( + start + timedelta(days=-3), + )[::-1] + thursday = self.env.market_minutes_for_day( + start + timedelta(days=-4), + )[:9:-1] + + self.assertEqual(len(minutes), 801) + self.assertEqual(minutes[0], utc_start) + self.assertTrue(all(today == minutes[:31])) + self.assertTrue(all(friday == minutes[31:421])) + self.assertTrue(all(thursday == minutes[421:])) diff --git a/tests/test_history.py b/tests/test_history.py index f4e0e400..dc8fc7e1 100644 --- a/tests/test_history.py +++ b/tests/test_history.py @@ -24,10 +24,14 @@ from zipline.history.history_container import HistoryContainer from zipline.protocol import BarData import zipline.utils.factory as factory from zipline import TradingAlgorithm -from zipline.finance.trading import SimulationParameters +from zipline.finance.trading import SimulationParameters, TradingEnvironment from zipline.sources import RandomWalkSource +from .history_cases import ( + HISTORY_CONTAINER_TEST_CASES, +) + # Cases are over the July 4th holiday, to ensure use of trading calendar. # March 2013 @@ -73,7 +77,7 @@ from zipline.sources import RandomWalkSource # Times to be converted via: # pd.Timestamp('2013-07-05 9:31', tz='US/Eastern').tz_convert('UTC')}, -MINUTE_CASES_RAW = { +INDEX_TEST_CASES_RAW = { 'week of daily data': { 'input': {'bar_count': 5, 'frequency': '1d', @@ -86,6 +90,18 @@ MINUTE_CASES_RAW = { '2013-07-05 9:31AM', ] }, + 'five minutes on july 5th open': { + 'input': {'bar_count': 5, + 'frequency': '1m', + 'algo_dt': '2013-07-05 9:31AM'}, + 'expected': [ + '2013-07-03 12:57PM', + '2013-07-03 12:58PM', + '2013-07-03 12:59PM', + '2013-07-03 1:00PM', + '2013-07-05 9:31AM', + ] + }, } @@ -104,28 +120,31 @@ def convert_cases(cases): in case['expected']]) return cases -MINUTE_CASES = convert_cases(MINUTE_CASES_RAW) +INDEX_TEST_CASES = convert_cases(INDEX_TEST_CASES_RAW) -def index_at_dt(case_input): +def get_index_at_dt(case_input): history_spec = history.HistorySpec( case_input['bar_count'], case_input['frequency'], None, False ) - return history.index_at_dt(history_spec, - case_input['algo_dt']) + return history.index_at_dt(history_spec, case_input['algo_dt']) class TestHistoryIndex(TestCase): + @classmethod + def setUpClass(cls): + cls.environment = TradingEnvironment.instance() + @parameterized.expand( [(name, case['input'], case['expected']) - for name, case in MINUTE_CASES.items()] + for name, case in INDEX_TEST_CASES.items()] ) def test_index_at_dt(self, name, case_input, expected): - history_index = index_at_dt(case_input) + history_index = get_index_at_dt(case_input) history_series = pd.Series(index=history_index) expected_series = pd.Series(index=expected) @@ -135,9 +154,64 @@ class TestHistoryIndex(TestCase): class TestHistoryContainer(TestCase): + @classmethod + def setUpClass(cls): + cls.env = TradingEnvironment.instance() + + def bar_data_dt(self, bar_data, require_unique=True): + """ + Get a dt to associate with the given BarData object. + + If require_unique == True, throw an error if multiple unique dt's are + encountered. Otherwise, return the earliest dt encountered. + """ + dts = {sid_data['dt'] for sid_data in bar_data.values()} + if require_unique and len(dts) > 1: + self.fail("Multiple unique dts ({0}) in {1}".format(dts, bar_data)) + + return sorted(dts)[0] + + @parameterized.expand( + [(name, + case['specs'], + case['sids'], + case['dt'], + case['updates'], + case['expected']) + for name, case in HISTORY_CONTAINER_TEST_CASES.items()] + ) + def test_history_container(self, + name, + specs, + sids, + dt, + updates, + expected): + + for spec in specs: + # Sanity check on test input. + self.assertEqual(len(expected[spec.key_str]), len(updates)) + + container = HistoryContainer( + {spec.key_str: spec for spec in specs}, sids, dt + ) + + for update_count, update in enumerate(updates): + + bar_dt = self.bar_data_dt(update) + container.update(update, bar_dt) + + for spec in specs: + pd.util.testing.assert_frame_equal( + container.get_history(spec, bar_dt), + expected[spec.key_str][update_count], + check_dtype=False, + check_column_type=True, + check_index_type=True, + check_frame_type=True, + ) + def test_container_nans_and_daily_roll(self): - # set up trading environment - factory.create_simulation_parameters(num_days=4) spec = history.HistorySpec( bar_count=3, @@ -145,7 +219,7 @@ class TestHistoryContainer(TestCase): field='price', ffill=True ) - specs = {hash(spec): spec} + specs = {spec.key_str: spec} initial_sids = [1, ] initial_dt = pd.Timestamp( '2013-06-28 9:31AM', tz='US/Eastern').tz_convert('UTC') @@ -154,7 +228,7 @@ class TestHistoryContainer(TestCase): specs, initial_sids, initial_dt) bar_data = BarData() - + container.update(bar_data, initial_dt) # Since there was no backfill because of no db. # And no first bar of data, so all values should be nans. prices = container.get_history(spec, initial_dt) @@ -169,7 +243,6 @@ class TestHistoryContainer(TestCase): 'price': 10, 'dt': second_bar_dt } - container.update(bar_data, second_bar_dt) prices = container.get_history(spec, second_bar_dt) @@ -288,7 +361,6 @@ def handle_data(context, data): # 12 13 14 15 16 17 18 # 19 20 21 22 23 24 25 # 26 27 28 29 30 31 - start = pd.Timestamp('2006-03-20', tz='UTC') end = pd.Timestamp('2006-03-21', tz='UTC') diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index 098237c5..62f392a0 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -18,6 +18,7 @@ import logbook import datetime import pandas as pd +import numpy as np from zipline.data.loader import load_market_data from zipline.utils import tradingcalendar @@ -174,11 +175,39 @@ class TradingEnvironment(object): return None + def previous_trading_day(self, test_date): + dt = self.normalize_date(test_date) + delta = datetime.timedelta(days=-1) + + while self.first_trading_day < test_date: + dt += delta + if dt in self.trading_days: + return dt + + return None + def days_in_range(self, start, end): mask = ((self.trading_days >= start) & (self.trading_days <= end)) return self.trading_days[mask] + def minutes_for_days_in_range(self, start, end): + """ + Get all market minutes for the days between start and end, inclusive. + """ + start_date = self.normalize_date(start) + end_date = self.normalize_date(end) + + all_minutes = [] + for day in self.days_in_range(start_date, end_date): + day_minutes = self.market_minutes_for_day(day) + all_minutes.append(day_minutes) + + # Concatenate all minutes and truncate minutes before start/after end. + return pd.DatetimeIndex( + np.concatenate(all_minutes), copy=False, tz='UTC', + ) + def next_open_and_close(self, start_date): """ Given the start_date, returns the next open and close of @@ -193,15 +222,104 @@ Last successful date: %s" % self.last_trading_day) return self.get_open_and_close(next_open) + def previous_open_and_close(self, start_date): + """ + Given the start_date, returns the previous open and close of the + market. + """ + previous = self.previous_trading_day(start_date) + + if previous is None: + raise NoFurtherDataError( + "Attempt to backtest beyond available history. " + "First successful date: %s" % self.first_trading_day) + return self.get_open_and_close(previous) + + def next_market_minute(self, start): + """ + Get the next market minute after @start. This is either the immediate + next minute, or the open of the next market day after start. + """ + next_minute = start + datetime.timedelta(minutes=1) + if self.is_market_hours(next_minute): + return next_minute + return self.next_open_and_close(start)[0] + + def previous_market_minute(self, start): + """ + Get the next market minute before @start. This is either the immediate + previous minute, or the close of the market day before start. + """ + prev_minute = start - datetime.timedelta(minutes=1) + if self.is_market_hours(prev_minute): + return prev_minute + return self.previous_open_and_close(start)[1] + def get_open_and_close(self, day): todays_minutes = self.open_and_closes.ix[day.date()] return todays_minutes['market_open'], todays_minutes['market_close'] - def market_minutes_for_day(self, midnight): - market_open, market_close = self.get_open_and_close(midnight) + def market_minutes_for_day(self, stamp): + market_open, market_close = self.get_open_and_close(stamp) return pd.date_range(market_open, market_close, freq='T') + def open_close_window(self, start, count, offset=0, step=1): + """ + Return a DataFrame containing `count` market opens and closes, + beginning with `start` + `offset` days and continuing `step` minutes at + a time. + """ + # TODO: Correctly handle end of data. + start_idx = self.get_index(start) + offset + stop_idx = start_idx + (count * step) + + index = np.arange(start_idx, stop_idx, step) + + return self.open_and_closes.iloc[index] + + def market_minute_window(self, start, count, step=1): + """ + Return a DatetimeIndex containing `count` market minutes, starting with + `start` and continuing `step` minutes at a time. + """ + if not self.is_market_hours(start): + raise ValueError("market_minute_window starting at " + "non-market time {minute}".format(minute=start)) + + all_minutes = [] + + current_day_minutes = self.market_minutes_for_day(start) + first_minute_idx = current_day_minutes.searchsorted(start) + minutes_in_range = current_day_minutes[first_minute_idx::step] + + # Build up list of lists of days' market minutes until we have count + # minutes stored altogether. + while True: + + if len(minutes_in_range) >= count: + # Truncate off extra minutes + minutes_in_range = minutes_in_range[:count] + + all_minutes.append(minutes_in_range) + count -= len(minutes_in_range) + if count <= 0: + break + + if step > 0: + start, _ = self.next_open_and_close(start) + current_day_minutes = self.market_minutes_for_day(start) + else: + _, start = self.previous_open_and_close(start) + current_day_minutes = self.market_minutes_for_day(start) + + minutes_in_range = current_day_minutes[::step] + + # Concatenate all the accumulated minutes. + return pd.DatetimeIndex( + np.concatenate(all_minutes), copy=False, tz='UTC', + ) + def trading_day_distance(self, first_date, second_date): first_date = self.normalize_date(first_date) second_date = self.normalize_date(second_date) diff --git a/zipline/history/history.py b/zipline/history/history.py index dcf49694..e5b147b3 100644 --- a/zipline/history/history.py +++ b/zipline/history/history.py @@ -32,19 +32,183 @@ class Frequency(object): Represents how the data is sampled, as specified by the algoscript via units like "1d", "1m", etc. - Currently only one frequency is supported, "1d" - "1d" provides data keyed by closing, and the last minute of the current - day. + Currently only two frequencies are supported, "1d" and "1m" + + - "1d" provides data at daily frequency, with the latest bar aggregating + the elapsed minutes of the (incomplete) current day + - "1m" provides data at minute frequency """ + SUPPORTED_FREQUENCIES = frozenset({'1d', '1m'}) + MAX_MINUTES = {'m': 1, 'd': 390} def __init__(self, freq_str): + + if freq_str not in self.SUPPORTED_FREQUENCIES: + raise ValueError( + "history frequency must be in {supported}".format( + supported=self.SUPPORTED_FREQUENCIES, + )) # The string the at the algoscript specifies. # Hold onto to use a key for caching. self.freq_str = freq_str + # num - The number of units of the frequency. # unit_str - The unit type, e.g. 'd' self.num, self.unit_str = parse_freq_str(freq_str) + def next_window_start(self, previous_window_close): + """ + Get the first minute of the window starting after a window that + finished on @previous_window_close. + """ + if self.unit_str == 'd': + return self.next_day_window_start(previous_window_close) + elif self.unit_str == 'm': + return self.next_minute_window_start(previous_window_close) + + @staticmethod + def next_day_window_start(previous_window_close): + """ + Get the next day window start after @previous_window_close. This is + defined as the first market open strictly greater than + @previous_window_close. + """ + env = trading.environment + next_open, _ = env.next_open_and_close(previous_window_close) + return next_open + + @staticmethod + def next_minute_window_start(previous_window_close): + """ + Get the next minute window start after @previous_window_close. This is + defined as the first market minute strictly greater than + @previous_window_close. + """ + env = trading.environment + return env.next_market_minute(previous_window_close) + + def window_open(self, window_close): + """ + For a period ending on `window_end`, calculate the date of the first + minute bar that should be used to roll a digest for this frequency. + """ + if self.unit_str == 'd': + return self.day_window_open(window_close, self.num) + elif self.unit_str == 'm': + return self.minute_window_open(window_close, self.num) + + def window_close(self, window_start): + """ + For a period starting on `window_start`, calculate the date of the last + minute bar that should be used to roll a digest for this frequency. + """ + if self.unit_str == 'd': + return self.day_window_close(window_start, self.num) + elif self.unit_str == 'm': + return self.minute_window_close(window_start, self.num) + + @staticmethod + def day_window_open(window_close, num_days): + """ + Get the first minute for a daily window of length @num_days with last + minute @window_close. This is calculated by searching backward until + @num_days market_closes are encountered. + """ + env = trading.environment + open_ = env.open_close_window( + window_close, + 1, + offset=-(num_days - 1) + ).market_open.iloc[0] + return open_ + + @staticmethod + def minute_window_open(window_close, num_minutes): + """ + Get the first minute for a minutely window of length @num_minutes with + last minute @window_close. + + This is defined as window_close if num_minutes == 1, and otherwise as + the N-1st market minute after @window_start. + """ + if num_minutes == 1: + # Short circuit this case. + return window_close + + env = trading.environment + return env.market_minute_window(window_close, count=-num_minutes)[-1] + + @staticmethod + def day_window_close(window_start, num_days): + """ + Get the last minute for a daily window of length @num_days with first + minute @window_start. This is calculated by searching forward until + @num_days market closes are encountered. + + Examples: + + window_start = Thursday March 2nd, 2006, 9:31 AM EST + num_days = 1 + --> window_close = Thursday March 2nd, 2006, 4:00 PM EST + + window_start = Thursday March 2nd, 2006, 3:59 AM EST + num_days = 1 + --> window_close = Thursday March 2nd, 2006, 4:00 PM EST + + window_start = Thursday March 2nd, 2006, 9:31 AM EST + num_days = 2 + --> window_close = Friday March 2nd, 2006, 4:00 PM EST + + window_start = Thursday March 2nd, 2006, 9:31 AM EST + num_days = 3 + --> window_close = Monday March 6th, 2006, 4:00 PM EST + + # Day before July 4th is an early close + window_start = Wednesday July 3rd, 2013, 9:31 AM EST + num_days = 1 + --> window_close = Wednesday July 3rd, 2013, 1:00 PM EST + """ + env = trading.environment + close = env.open_close_window( + window_start, + 1, + offset=num_days - 1 + ).market_close.iloc[0] + return close + + @staticmethod + def minute_window_close(window_start, num_minutes): + """ + Get the last minute for a minutely window of length @num_minutes with + first minute @window_start. + + This is defined as window_start if num_minutes == 1, and otherwise as + the N-1st market minute after @window_start. + """ + if num_minutes == 1: + # Short circuit this case. + return window_start + + env = trading.environment + return env.market_minute_window(window_start, count=num_minutes)[-1] + + @property + def max_minutes(self): + """ + The maximum number of minutes required to roll a bar at this frequency. + """ + return self.MAX_MINUTES[self.unit_str] * self.num + + def __eq__(self, other): + return self.freq_str == other.freq_str + + def __hash__(self): + return hash(self.freq_str) + + def __repr__(self): + return ''.join([str(self.__class__.__name__), + "('", self.freq_str, "')"]) + class HistorySpec(object): """ @@ -55,6 +219,8 @@ class HistorySpec(object): result frames. """ + FORWARD_FILLABLE = frozenset({'price'}) + @classmethod def spec_key(cls, bar_count, freq_str, field, ffill): """ @@ -73,63 +239,74 @@ class HistorySpec(object): # The field, e.g. 'price', 'volume', etc. self.field = field # Whether or not to forward fill the nan data. - self.ffill = ffill - - # How many trading days the spec needs to look back. - # Used by index creation to see how large of an overarching window - # is needed. - self.days_needed = calculate_days_needed( - self.bar_count, self.frequency) + self._ffill = ffill # Calculate the cache key string once. self.key_str = self.spec_key( bar_count, frequency.freq_str, field, ffill) + @property + def ffill(self): + """ + Wrapper around ffill that returns False for fields which are not + forward-fillable. + """ + return self._ffill and self.field in self.FORWARD_FILLABLE -def calculate_days_needed(bar_count, freq): - """ Returns number trading days needed. - Overshoots so that we more than enough to sample from the current - frequency slot plus previous ones. + def __repr__(self): + return ''.join([self.__class__.__name__, "('", self.key_str, "')"]) + + +def days_index_at_dt(history_spec, algo_dt): """ - if freq.unit_str == 'd': - return bar_count * freq.num - - -def days_index_at_dt(days_needed, algo_dt): - """ - The timestamps of previous days closes with the size of @days_needed - at @algo_dt. + Get the index of a frame to be used for a get_history call with daily + frequency. """ env = trading.environment + # Get the previous (bar_count - 1) days' worth of market closes. + day_delta = (history_spec.bar_count - 1) * history_spec.frequency.num + market_closes = env.open_close_window( + algo_dt, + day_delta, + offset=(-day_delta), + step=history_spec.frequency.num, + ).market_close - latest_algo_dt = algo_dt - - current_index = env.open_and_closes.index.searchsorted(algo_dt.date()) - - previous_days_num = days_needed - 1 - - previous_days = env.open_and_closes['market_close'][ - current_index - previous_days_num:current_index] - + # Append the current algo_dt as the last index value. # Using the 'rawer' numpy array values here because of a bottleneck # that appeared when using DatetimeIndex - return np.append(previous_days.values, latest_algo_dt) + return np.append(market_closes.values, algo_dt) + + +def minutes_index_at_dt(history_spec, algo_dt): + """ + Get the index of a frame to be used for a get_history_call with minutely + frequency. + """ + # TODO: This is almost certainly going to be too slow for production. + env = trading.environment + return env.market_minute_window( + algo_dt, + history_spec.bar_count, + step=-1, + )[::-1] def index_at_dt(history_spec, algo_dt): """ - The index, including @algo_dt at the given @algo_dt for the count - and frequency of the @history_spec. + Returns index of a frame returned by get_history() with the given + history_spec and algo_dt. + + The resulting index `@history_spec.bar_count` bars, increasing in units of + `@history_spec.frequency`, terminating at the given @algo_dt. + + Note: The last bar of the returned frame represents an as-of-yet incomplete + time window, so the delta between the last and second-to-last bars is + usually always less than `@history_spec.frequency` for frequencies greater + than 1m. """ - days_index = days_index_at_dt(history_spec.days_needed, algo_dt) - frequency = history_spec.frequency - if frequency.unit_str == 'd': - - index_of_algo_dt = days_index.searchsorted(algo_dt) - - start_index = index_of_algo_dt + 1 - history_spec.bar_count - end_index = index_of_algo_dt + 1 - - return days_index[start_index:end_index] + return days_index_at_dt(history_spec, algo_dt) + elif frequency.unit_str == 'm': + return minutes_index_at_dt(history_spec, algo_dt) diff --git a/zipline/history/history_container.py b/zipline/history/history_container.py index e7b74b86..38811c88 100644 --- a/zipline/history/history_container.py +++ b/zipline/history/history_container.py @@ -12,56 +12,124 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from itertools import groupby import numpy as np import pandas as pd -from six import itervalues + +from six import itervalues, iteritems, iterkeys from . history import ( index_at_dt, - days_index_at_dt, ) from zipline.finance import trading from zipline.utils.data import RollingPanel + # The closing price is referred to by multiple names, # allow both for price rollover logic etc. -CLOSING_PRICE_FIELDS = {'price', 'close_price'} +CLOSING_PRICE_FIELDS = frozenset({'price', 'close_price'}) -def create_initial_day_panel(days_needed, fields, sids, dt): - index = days_index_at_dt(days_needed, dt) - # Use original index in case of 1 bar. - if days_needed != 1: - index = index[:-1] - window = len(index) - rp = RollingPanel(window, fields, sids) - for i, day in enumerate(index): - rp.index_buf[i] = day - rp.pos = window - return rp +def ffill_buffer_from_prior_values(field, + buffer_frame, + digest_frame, + pre_digest_values): + """ + Forward-fill a buffer frame, falling back to the end-of-period values of a + digest frame if the buffer frame has leading NaNs. + """ + + # Get values which are NaN at the beginning of the period. + first_bar = buffer_frame.iloc[0] + + def iter_nan_sids(): + """ + Helper for iterating over the remaining nan sids in first_bar. + """ + return (sid for sid in first_bar[first_bar.isnull()].index) + + # Try to fill with the last entry from the digest frame. + if digest_frame is not None: + # We don't store a digest frame for frequencies that only have a bar + # count of 1. + for sid in iter_nan_sids(): + buffer_frame[sid][0] = digest_frame.ix[-1, sid] + + # If we still have nan sids, try to fill with pre_digest_values. + for sid in iter_nan_sids(): + prior_sid_value = pre_digest_values[field].get(sid) + if prior_sid_value: + # If the prior value is greater than the timestamp of our first + # bar. + if prior_sid_value.get('dt', first_bar.name) > first_bar.name: + buffer_frame[sid][0] = prior_sid_value.get('value', np.nan) + + return buffer_frame.ffill() -def create_current_day_panel(fields, sids, dt): - # Can't use open_and_close since need to create enough space for a full - # day, even on a half day. - # Can now use mkt open and close, since we don't roll - env = trading.environment - index = env.market_minutes_for_day(dt) - return pd.Panel(items=fields, minor_axis=sids, major_axis=index) +def ffill_digest_frame_from_prior_values(field, digest_frame, prior_values): + """ + Forward-fill a digest frame, falling back to the last known priof values if + necessary. + """ + if digest_frame is not None: + # Digest frame is None in the case that we only have length 1 history + # specs for a given frequency. + + # It's possible that the first bar in our digest frame is storing NaN + # values. If so, check if we've tracked an older value and use that as + # an ffill value for the first bar. + first_bar = digest_frame.ix[0] + nan_sids = first_bar[first_bar.isnull()].index + for sid in nan_sids: + try: + # Only use prior value if it is before the index, + # so that a backfill does not accidentally occur. + if prior_values[field][sid]['dt'] <= digest_frame.index[0]: + digest_frame[sid][0] = prior_values[field][sid]['value'] + + except KeyError: + # Allow case where there is no previous value. + # e.g. with leading nans. + pass + digest_frame = digest_frame.ffill() + return digest_frame -def ffill_day_frame(field, day_frame, prior_day_frame): - # get values which are nan-at the beginning of the day - # and attempt to fill with the last close - first_bar = day_frame.ix[0] - nan_sids = first_bar[np.isnan(first_bar)] - for sid, _ in nan_sids.iterkv(): - day_frame[sid][0] = prior_day_frame.ix[-1, sid] - if field != 'volume': - day_frame = day_frame.ffill() - return day_frame +def freq_str_and_bar_count(history_spec): + """ + Helper for getting the frequency string from a history spec. + """ + return (history_spec.frequency.freq_str, history_spec.bar_count) + + +def group_by_frequency(history_specs): + """ + Takes an iterable of history specs and returns a dictionary mapping unique + frequencies to a list of specs with that frequency. + + Within each list, the HistorySpecs are sorted by ascending bar count. + + Example: + + [HistorySpec(3, '1d', 'price', True), + HistorySpec(2, '2d', 'open', True), + HistorySpec(2, '1d', 'open', False), + HistorySpec(5, '1m', 'open', True)] + + yields + + {Frequency('1d') : [HistorySpec(2, '1d', 'open', False)], + HistorySpec(3, '1d', 'price', True), + Frequency('2d') : [HistorySpec(2, '2d', 'open', True)], + Frequency('1m') : [HistorySpec(5, '1m', 'open', True)]} + """ + return {key: list(group) + for key, group in groupby( + sorted(history_specs, key=freq_str_and_bar_count), + key=lambda spec: spec.frequency)} class HistoryContainer(object): @@ -78,35 +146,105 @@ class HistoryContainer(object): # History specs to be served by this container. self.history_specs = history_specs - - # The overaching panel needs to be large enough to contain the - # largest history spec - self.max_days_needed = max(spec.days_needed for spec - in itervalues(history_specs)) + self.frequency_groups = \ + group_by_frequency(itervalues(self.history_specs)) # The set of fields specified by all history specs self.fields = set(spec.field for spec in itervalues(history_specs)) - self.prior_day_panel = create_initial_day_panel( - self.max_days_needed, self.fields, initial_sids, initial_dt) + # This panel contains raw minutes for periods that haven't been fully + # completed. When a frequency period rolls over, these minutes are + # digested using some sort of aggregation call on the panel (e.g. `sum` + # for volume, `max` for high, `min` for low, etc.). + self.buffer_panel = self.create_buffer_panel( + initial_sids, + initial_dt, + ) - # This panel contains the minutes for the current day. - # The value that is used is some sort of aggregation call on the - # panel, e.g. `sum` for volume, `max` for high, etc. - self.current_day_panel = create_current_day_panel( - self.fields, initial_sids, initial_dt) + # Dictionaries with Frequency objects as keys. + self.digest_panels, self.cur_window_starts, self.cur_window_closes = \ + self.create_digest_panels(initial_sids, initial_dt) + + # Populating initial frames here, so that the cost of creating the + # initial frames does not show up when profiling. These frames are + # cached since mid-stream creation of containing data frames on every + # bar is expensive. + self.create_return_frames(initial_dt) # Helps prop up the prior day panel against having a nan, when the data # has been seen. self.last_known_prior_values = {field: {} for field in self.fields} - # Populating initial frames here, so that the cost of creating the - # initial frames does not show up when profiling get_y - # These frames are cached since mid-stream creation of containing - # data frames on every bar is expensive. - self.return_frames = {} + @property + def unique_frequencies(self): + """ + Return an iterator over all the unique frequencies serviced by this + container. + """ + return iterkeys(self.frequency_groups) - self.create_return_frames(initial_dt) + def create_digest_panels(self, initial_sids, initial_dt): + """ + Initialize a RollingPanel for each unique panel frequency being stored + by this container. Each RollingPanel pre-allocates enough storage + space to service the highest bar-count of any history call that it + serves. + + Relies on the fact that group_by_frequency sorts the value lists by + ascending bar count. + """ + # Map from frequency -> first/last minute of the next digest to be + # rolled for that frequency. + first_window_starts = {} + first_window_closes = {} + + # Map from frequency -> digest_panels. + panels = {} + for freq, specs in iteritems(self.frequency_groups): + + # Relying on the sorting of group_by_frequency to get the spec + # requiring the largest number of bars. + largest_spec = specs[-1] + if largest_spec.bar_count == 1: + # No need to allocate a digest panel; this frequency will only + # ever use data drawn from self.buffer_panel. + env = trading.environment + first_window_closes[freq] = \ + env.get_open_and_close(initial_dt)[1] + first_window_starts[freq] = \ + freq.window_open(first_window_closes[freq]) + continue + + initial_dates = index_at_dt(largest_spec, initial_dt) + + # Set up dates for our first digest roll, which is keyed to the + # close of the first entry in our initial index. + first_window_closes[freq] = initial_dates[0] + first_window_starts[freq] = freq.window_open(initial_dates[0]) + + rp = RollingPanel(len(initial_dates) - 1, + self.fields, + initial_sids) + + panels[freq] = rp + + return panels, first_window_starts, first_window_closes + + def create_buffer_panel(self, initial_sids, initial_dt): + """ + Initialize a RollingPanel containing enough minutes to service all our + frequencies. + """ + max_bars_needed = max(freq.max_minutes + for freq in self.unique_frequencies) + rp = RollingPanel( + max_bars_needed, + self.fields, + initial_sids, + # Restrict the initial data down to just the fields being used in + # this container. + ) + return rp def create_return_frames(self, algo_dt): """ @@ -114,101 +252,158 @@ class HistoryContainer(object): Called during init and at universe rollovers. """ - for history_spec in itervalues(self.history_specs): - index = index_at_dt(history_spec, algo_dt) - index = pd.to_datetime(index) + self.return_frames = {} + for spec_key, history_spec in iteritems(self.history_specs): + index = pd.to_datetime(index_at_dt(history_spec, algo_dt)) frame = pd.DataFrame( index=index, - columns=map(int, self.current_day_panel.minor_axis.values), + columns=map(int, self.buffer_panel.minor_axis.values), dtype=np.float64) - self.return_frames[history_spec] = frame + self.return_frames[spec_key] = frame + + def buffer_panel_minutes(self, + buffer_panel=None, + earliest_minute=None, + latest_minute=None): + """ + Get the minutes in @buffer_panel between @earliest_minute and + @last_minute, inclusive. + + @buffer_panel can be a RollingPanel or a plain Panel. If a + RollingPanel is supplied, we call `get_current` to extract a Panel + object. If no panel is supplied, we use self.buffer_panel. + + If no value is specified for @earliest_minute, use all the minutes we + have up until @latest minute. + + If no value for @latest_minute is specified, use all values up until + the latest minute. + """ + buffer_panel = buffer_panel or self.buffer_panel + if isinstance(buffer_panel, RollingPanel): + buffer_panel = buffer_panel.get_current() + + return buffer_panel.ix[:, earliest_minute:latest_minute, :] def update(self, data, algo_dt): """ - Takes the bar at @algo_dt's @data and adds to the current day panel. + Takes the bar at @algo_dt's @data, checks to see if we need to roll any + new digests, then adds new data to the buffer panel. """ - self.check_and_roll(algo_dt) + self.update_digest_panels(algo_dt, self.buffer_panel) fields = self.fields - field_data = {sid: {field: bar[field] for field in fields} - for sid, bar in data.iteritems() - if (bar - and - bar['dt'] == algo_dt - and - # Only use data which is keyed in the data panel. - # Prevents crashes due to custom data. - sid in self.current_day_panel.minor_axis)} - field_frame = pd.DataFrame(field_data) - self.current_day_panel.ix[:, algo_dt, :] = field_frame.T + frame = pd.DataFrame( + {sid: {field: bar[field] for field in fields} + for sid, bar in data.iteritems() + if (bar + and + bar['dt'] == algo_dt + and + # Only use data which is keyed in the data panel. + # Prevents crashes due to custom data. + sid in self.buffer_panel.minor_axis)}) + self.buffer_panel.add_frame(algo_dt, frame) + + def update_digest_panels(self, algo_dt, buffer_panel): + """ + Check whether @algo_dt is greater than cur_window_close for any of our + frequencies. If so, roll a digest for that frequency using data drawn + from @buffer panel and insert it into the appropriate digest panels. + """ + for frequency in self.unique_frequencies: + + # We don't keep a digest panel if we only have a length-1 history + # spec for a given frequency + digest_panel = self.digest_panels.get(frequency, None) + + while algo_dt > self.cur_window_closes[frequency]: + + earliest_minute = self.cur_window_starts[frequency] + latest_minute = self.cur_window_closes[frequency] + minutes_to_process = self.buffer_panel_minutes( + buffer_panel, + earliest_minute=earliest_minute, + latest_minute=latest_minute, + ) + + # Create a digest from minutes_to_process and add it to + # digest_panel. + self.roll(frequency, + digest_panel, + minutes_to_process, + latest_minute) + + # Update panel start/close for this frequency. + self.cur_window_starts[frequency] = \ + frequency.next_window_start(latest_minute) + self.cur_window_closes[frequency] = \ + frequency.window_close(self.cur_window_starts[frequency]) + + def roll(self, frequency, digest_panel, buffer_minutes, digest_dt): + """ + Package up minutes in @buffer_minutes insert that bar into + @digest_panel at index @last_minute, and update + self.cur_window_{starts|closes} for the given frequency. + """ + if digest_panel is None: + # This happens if the only spec we have at this frequency has a bar + # count of 1. + return - def roll(self, roll_dt): - env = trading.environment - # This should work for price, but not others, e.g. - # open. - # Get the most recent value. rolled = pd.DataFrame( - index=self.current_day_panel.items, - columns=self.current_day_panel.minor_axis) + index=self.fields, + columns=buffer_minutes.minor_axis) for field in self.fields: + if field in CLOSING_PRICE_FIELDS: - # Use the last price. - prices = self.current_day_panel.ffill().ix[field, -1, :] + # Use the last close, or NaN if we have no minutes. + try: + prices = buffer_minutes.loc[field].ffill().iloc[-1] + except IndexError: + # Scalar assignment sets the value for all entries. + prices = np.nan rolled.ix[field] = prices + elif field == 'open_price': - # Use the first price. - opens = self.current_day_panel.ix['open_price', 0, :] + # Use the first open, or NaN if we have no minutes. + try: + opens = buffer_minutes.loc[field].bfill().iloc[0] + except IndexError: + # Scalar assignment sets the value for all entries. + opens = np.nan rolled.ix['open_price'] = opens + elif field == 'volume': # Volume is the sum of the volumes during the - # course of the day - volumes = self.current_day_panel.ix['volume'].apply(np.sum) + # course of the period. + volumes = buffer_minutes.ix['volume'].sum().fillna(0) rolled.ix['volume'] = volumes + elif field == 'high': # Use the highest high. - highs = self.current_day_panel.ix['high'].apply(np.max) + highs = buffer_minutes.ix['high'].max() rolled.ix['high'] = highs + elif field == 'low': # Use the lowest low. - lows = self.current_day_panel.ix['low'].apply(np.min) + lows = buffer_minutes.ix['low'].min() rolled.ix['low'] = lows for sid, value in rolled.ix[field].iterkv(): if not np.isnan(value): try: - prior_values = self.last_known_prior_values[field][sid] + prior_values = \ + self.last_known_prior_values[field][sid] except KeyError: prior_values = {} - self.last_known_prior_values[field][sid] = prior_values - prior_values['dt'] = roll_dt + self.last_known_prior_values[field][sid] = \ + prior_values + prior_values['dt'] = digest_dt prior_values['value'] = value - self.prior_day_panel.add_frame(roll_dt, rolled) - - # Create a new 'current day' collector. - next_day = env.next_trading_day(roll_dt) - - if next_day: - # Only create the next panel if there is a next day. - # i.e. don't create the next panel on the last day of - # the backest/current day of live trading. - self.current_day_panel = create_current_day_panel( - self.fields, - # Will break on quarter rollover. - self.current_day_panel.minor_axis, - next_day) - - def check_and_roll(self, algo_dt): - """ - Check whether the algo_dt is at the end of a day. - If it is, aggregate the day's minute data and store it in the prior - day panel. - """ - # Use a while loop to account for illiquid bars. - while algo_dt > self.current_day_panel.major_axis[-1]: - roll_dt = self.current_day_panel.major_axis[-1] - self.roll(roll_dt) + digest_panel.add_frame(digest_dt, rolled) def get_history(self, history_spec, algo_dt): """ @@ -217,57 +412,58 @@ class HistoryContainer(object): Selects from the overarching history panel the values for the @history_spec at the given @algo_dt. """ + field = history_spec.field + bar_count = history_spec.bar_count + do_ffill = history_spec.ffill - index = index_at_dt(history_spec, algo_dt) - index = pd.to_datetime(index) + index = pd.to_datetime(index_at_dt(history_spec, algo_dt)) + return_frame = self.return_frames[history_spec.key_str] - frame = self.return_frames[history_spec] # Overwrite the index. # Not worrying about values here since the values are overwritten # in the next step. - frame.index = index + return_frame.index = index - prior_day_panel = self.prior_day_panel.get_current() - prior_day_frame = prior_day_panel[field].copy() - if history_spec.ffill: - first_bar = prior_day_frame.ix[0] - nan_sids = first_bar[first_bar.isnull()] - for sid, _ in nan_sids.iterkv(): - try: - if ( - # Only use prior value if it is before the index, - # so that a backfill does not accidentally occur. - self.last_known_prior_values[field][sid]['dt'] <= - prior_day_frame.index[0]): - prior_day_frame[sid][0] =\ - self.last_known_prior_values[field][sid]['value'] - except KeyError: - # Allow case where there is no previous value. - # e.g. with leading nans. - pass - prior_day_frame = prior_day_frame.ffill() - frame.ix[:-1] = prior_day_frame.ix[:] + if bar_count > 1: + # Get the last bar_count - 1 frames from our stored historical + # frames. + digest_panel = self.digest_panels[history_spec.frequency]\ + .get_current() + digest_frame = digest_panel[field].copy().ix[1 - bar_count:] + else: + digest_frame = None - # Copy the current day frame, since the fill behavior will mutate - # the values in the panel. - current_day_frame = self.current_day_panel[field][:algo_dt].copy() - if history_spec.ffill: - current_day_frame = ffill_day_frame(field, - current_day_frame, - prior_day_frame) + # Get minutes from our buffer panel to build the last row. + buffer_frame = self.buffer_panel_minutes( + earliest_minute=self.cur_window_starts[history_spec.frequency], + )[field].copy() + + if do_ffill: + digest_frame = ffill_digest_frame_from_prior_values( + field, + digest_frame, + self.last_known_prior_values, + ) + buffer_frame = ffill_buffer_from_prior_values( + field, + buffer_frame, + digest_frame, + self.last_known_prior_values, + ) + + if digest_frame is not None: + return_frame.ix[:-1] = digest_frame.ix[:] if field == 'volume': - # This works for the day rollup, i.e. '1d', - # but '1m' will need to allow for 0 or nan minutes - frame.ix[algo_dt] = current_day_frame.sum() + return_frame.ix[algo_dt] = buffer_frame.fillna(0).sum() elif field == 'high': - frame.ix[algo_dt] = current_day_frame.max() + return_frame.ix[algo_dt] = buffer_frame.max() elif field == 'low': - frame.ix[algo_dt] = current_day_frame.min() + return_frame.ix[algo_dt] = buffer_frame.min() elif field == 'open_price': - frame.ix[algo_dt] = current_day_frame.ix[0] + return_frame.ix[algo_dt] = buffer_frame.iloc[0] else: - frame.ix[algo_dt] = current_day_frame.ix[algo_dt] + return_frame.ix[algo_dt] = buffer_frame.loc[algo_dt] - return frame + return return_frame diff --git a/zipline/protocol.py b/zipline/protocol.py index e3983de8..7d78119d 100644 --- a/zipline/protocol.py +++ b/zipline/protocol.py @@ -157,8 +157,8 @@ class BarData(object): usage of what this replaced as a dictionary subclass. """ - def __init__(self): - self._data = {} + def __init__(self, data=None): + self._data = data or {} self._contains_override = None def __contains__(self, name): @@ -217,3 +217,6 @@ class BarData(object): def __len__(self): return len(self.keys()) + + def __repr__(self): + return '{0}({1})'.format(self.__class__.__name__, self._data) diff --git a/zipline/utils/data.py b/zipline/utils/data.py index 67a3c4ae..5dbd8c31 100644 --- a/zipline/utils/data.py +++ b/zipline/utils/data.py @@ -32,8 +32,8 @@ class RollingPanel(object): Restrictions: major_axis can only be a DatetimeIndex for now """ - def __init__(self, window, items, sids, cap_multiple=2, - dtype=np.float64): + def __init__(self, window, items, sids, cap_multiple=2, dtype=np.float64): + self.pos = 0 self.window = window @@ -49,9 +49,14 @@ class RollingPanel(object): self.buffer = self._create_buffer() def _create_buffer(self): - return pd.Panel(items=self.items, minor_axis=self.minor_axis, - major_axis=range(self.cap), - dtype=self.dtype) + panel = pd.Panel( + items=self.items, + minor_axis=self.minor_axis, + major_axis=range(self.cap), + dtype=self.dtype, + ) + + return panel def _update_buffer(self, frame): # Drop outdated, nan-filled minors (sids) and items (fields)