Merge branch 'one_minute_history_for_realz'

This commit is contained in:
Scott Sanderson
2014-06-05 16:37:38 -04:00
8 changed files with 1566 additions and 298 deletions
+598
View File
@@ -0,0 +1,598 @@
"""
Test case definitions for history tests.
"""
import pandas as pd
import numpy as np
from zipline.finance.trading import TradingEnvironment
from zipline.history.history import HistorySpec
from zipline.protocol import BarData
def to_utc(time_str):
return pd.Timestamp(time_str, tz='US/Eastern').tz_convert('UTC')
def mixed_frequency_expected_index(count, frequency):
"""
Helper for enumerating expected indices for test_mixed_frequency.
"""
env = TradingEnvironment.instance()
minute = MIXED_FREQUENCY_MINUTES[count]
if frequency == '1d':
return [env.previous_open_and_close(minute)[1], minute]
elif frequency == '1m':
return [env.previous_market_minute(minute), minute]
def mixed_frequency_expected_data(count, frequency):
"""
Helper for enumerating expected data test_mixed_frequency.
"""
if frequency == '1d':
# First day of this test is July 3rd, which is a half day.
if count < 210:
return [np.nan, count]
else:
return [209, count]
elif frequency == '1m':
if count == 0:
return [np.nan, count]
else:
return [count - 1, count]
MIXED_FREQUENCY_MINUTES = TradingEnvironment.instance().market_minute_window(
to_utc('2013-07-03 9:31AM'), 600,
)
DAILY_OPEN_CLOSE_SPECS = [
HistorySpec(3, '1d', 'open_price', False),
HistorySpec(3, '1d', 'close_price', False),
]
ILLIQUID_PRICES_SPECS = [
HistorySpec(3, '1m', 'price', False),
HistorySpec(5, '1m', 'price', True),
]
MIXED_FREQUENCY_SPECS = [
HistorySpec(1, '1m', 'price', False),
HistorySpec(2, '1m', 'price', False),
HistorySpec(2, '1d', 'price', False),
]
MIXED_FIELDS_SPECS = [
HistorySpec(3, '1m', 'price', True),
HistorySpec(3, '1m', 'open_price', True),
HistorySpec(3, '1m', 'close_price', True),
HistorySpec(3, '1m', 'high', True),
HistorySpec(3, '1m', 'low', True),
HistorySpec(3, '1m', 'volume', True),
]
HISTORY_CONTAINER_TEST_CASES = {
# June 2013
# Su Mo Tu We Th Fr Sa
# 1
# 2 3 4 5 6 7 8
# 9 10 11 12 13 14 15
# 16 17 18 19 20 21 22
# 23 24 25 26 27 28 29
# 30
'test daily open close': {
# A list of HistorySpec objects.
'specs': DAILY_OPEN_CLOSE_SPECS,
# Sids for the test.
'sids': [1],
# Start date for test.
'dt': to_utc('2013-06-21 9:31AM'),
# Sequence of updates to the container
'updates': [
BarData(
{
1: {
'open_price': 10,
'close_price': 11,
'dt': to_utc('2013-06-21 10:00AM'),
},
},
),
BarData(
{
1: {
'open_price': 12,
'close_price': 13,
'dt': to_utc('2013-06-21 3:30PM'),
},
},
),
BarData(
{
1: {
'open_price': 14,
'close_price': 15,
# Wait a full market day before the next bar.
# We should end up with nans for Monday the 24th.
'dt': to_utc('2013-06-25 9:31AM'),
},
},
),
],
# Dictionary mapping spec_key -> list of expected outputs
'expected': {
# open
DAILY_OPEN_CLOSE_SPECS[0].key_str: [
pd.DataFrame(
data={
1: [np.nan, np.nan, 10]
},
index=[
to_utc('2013-06-19 4:00PM'),
to_utc('2013-06-20 4:00PM'),
to_utc('2013-06-21 10:00AM'),
],
),
pd.DataFrame(
data={
1: [np.nan, np.nan, 10]
},
index=[
to_utc('2013-06-19 4:00PM'),
to_utc('2013-06-20 4:00PM'),
to_utc('2013-06-21 3:30PM'),
],
),
pd.DataFrame(
data={
1: [10, np.nan, 14]
},
index=[
to_utc('2013-06-21 4:00PM'),
to_utc('2013-06-24 4:00PM'),
to_utc('2013-06-25 9:31AM'),
],
),
],
# close
DAILY_OPEN_CLOSE_SPECS[1].key_str: [
pd.DataFrame(
data={
1: [np.nan, np.nan, 11]
},
index=[
to_utc('2013-06-19 4:00PM'),
to_utc('2013-06-20 4:00PM'),
to_utc('2013-06-21 10:00AM'),
],
),
pd.DataFrame(
data={
1: [np.nan, np.nan, 13]
},
index=[
to_utc('2013-06-19 4:00PM'),
to_utc('2013-06-20 4:00PM'),
to_utc('2013-06-21 3:30PM'),
],
),
pd.DataFrame(
data={
1: [13, np.nan, 15]
},
index=[
to_utc('2013-06-21 4:00PM'),
to_utc('2013-06-24 4:00PM'),
to_utc('2013-06-25 9:31AM'),
],
),
],
},
},
'test illiquid prices': {
# A list of HistorySpec objects.
'specs': ILLIQUID_PRICES_SPECS,
# Sids for the test.
'sids': [1],
# Start date for test.
'dt': to_utc('2013-06-28 9:31AM'),
# Sequence of updates to the container
'updates': [
BarData(
{
1: {
'price': 10,
'dt': to_utc('2013-06-28 9:31AM'),
},
},
),
BarData(
{
1: {
'price': 11,
'dt': to_utc('2013-06-28 9:32AM'),
},
},
),
BarData(
{
1: {
'price': 12,
'dt': to_utc('2013-06-28 9:33AM'),
},
},
),
BarData(
{
1: {
'price': 13,
# Note: Skipping 9:34 to simulate illiquid bar/missing
# data.
'dt': to_utc('2013-06-28 9:35AM'),
},
},
),
],
# Dictionary mapping spec_key -> list of expected outputs
'expected': {
ILLIQUID_PRICES_SPECS[0].key_str: [
pd.DataFrame(
data={
1: [np.nan, np.nan, 10],
},
index=[
to_utc('2013-06-27 3:59PM'),
to_utc('2013-06-27 4:00PM'),
to_utc('2013-06-28 9:31AM'),
],
),
pd.DataFrame(
data={
1: [np.nan, 10, 11],
},
index=[
to_utc('2013-06-27 4:00PM'),
to_utc('2013-06-28 9:31AM'),
to_utc('2013-06-28 9:32AM'),
],
),
pd.DataFrame(
data={
1: [10, 11, 12],
},
index=[
to_utc('2013-06-28 9:31AM'),
to_utc('2013-06-28 9:32AM'),
to_utc('2013-06-28 9:33AM'),
],
),
# Since there's no update for 9:34, this is called at 9:35.
pd.DataFrame(
data={
1: [12, np.nan, 13],
},
index=[
to_utc('2013-06-28 9:33AM'),
to_utc('2013-06-28 9:34AM'),
to_utc('2013-06-28 9:35AM'),
],
),
],
ILLIQUID_PRICES_SPECS[1].key_str: [
pd.DataFrame(
data={
1: [np.nan, np.nan, np.nan, np.nan, 10],
},
index=[
to_utc('2013-06-27 3:57PM'),
to_utc('2013-06-27 3:58PM'),
to_utc('2013-06-27 3:59PM'),
to_utc('2013-06-27 4:00PM'),
to_utc('2013-06-28 9:31AM'),
],
),
pd.DataFrame(
data={
1: [np.nan, np.nan, np.nan, 10, 11],
},
index=[
to_utc('2013-06-27 3:58PM'),
to_utc('2013-06-27 3:59PM'),
to_utc('2013-06-27 4:00PM'),
to_utc('2013-06-28 9:31AM'),
to_utc('2013-06-28 9:32AM'),
],
),
pd.DataFrame(
data={
1: [np.nan, np.nan, 10, 11, 12],
},
index=[
to_utc('2013-06-27 3:59PM'),
to_utc('2013-06-27 4:00PM'),
to_utc('2013-06-28 9:31AM'),
to_utc('2013-06-28 9:32AM'),
to_utc('2013-06-28 9:33AM'),
],
),
# Since there's no update for 9:34, this is called at 9:35.
# The 12 value from 9:33 should be forward-filled.
pd.DataFrame(
data={
1: [10, 11, 12, 12, 13],
},
index=[
to_utc('2013-06-28 9:31AM'),
to_utc('2013-06-28 9:32AM'),
to_utc('2013-06-28 9:33AM'),
to_utc('2013-06-28 9:34AM'),
to_utc('2013-06-28 9:35AM'),
],
),
],
},
},
'test mixed frequencies': {
# A list of HistorySpec objects.
'specs': MIXED_FREQUENCY_SPECS,
# Sids for the test.
'sids': [1],
# Start date for test.
# July 2013
# Su Mo Tu We Th Fr Sa
# 1 2 3 4 5 6
# 7 8 9 10 11 12 13
# 14 15 16 17 18 19 20
# 21 22 23 24 25 26 27
# 28 29 30 31
'dt': to_utc('2013-07-03 9:31AM'),
# Sequence of updates to the container
'updates': [
BarData(
{
1: {
'price': count,
'dt': dt,
}
}
)
for count, dt in enumerate(MIXED_FREQUENCY_MINUTES)
],
# Dictionary mapping spec_key -> list of expected outputs.
'expected': {
MIXED_FREQUENCY_SPECS[0].key_str: [
pd.DataFrame(
data={
1: [count],
},
index=[minute],
)
for count, minute in enumerate(MIXED_FREQUENCY_MINUTES)
],
MIXED_FREQUENCY_SPECS[1].key_str: [
pd.DataFrame(
data={
1: mixed_frequency_expected_data(count, '1m'),
},
index=mixed_frequency_expected_index(count, '1m'),
)
for count in range(len(MIXED_FREQUENCY_MINUTES))
],
MIXED_FREQUENCY_SPECS[2].key_str: [
pd.DataFrame(
data={
1: mixed_frequency_expected_data(count, '1d'),
},
index=mixed_frequency_expected_index(count, '1d'),
)
for count in range(len(MIXED_FREQUENCY_MINUTES))
]
},
},
'test multiple fields and sids': {
# A list of HistorySpec objects.
'specs': MIXED_FIELDS_SPECS,
# Sids for the test.
'sids': [1, 10],
# Start date for test.
'dt': to_utc('2013-06-28 9:31AM'),
# Sequence of updates to the container
'updates': [
BarData(
{
1: {
'dt': dt,
'price': count,
'open_price': count,
'close_price': count,
'high': count,
'low': count,
'volume': count,
},
10: {
'dt': dt,
'price': count * 10,
'open_price': count * 10,
'close_price': count * 10,
'high': count * 10,
'low': count * 10,
'volume': count * 10,
},
},
)
for count, dt in enumerate([
to_utc('2013-06-28 9:31AM'),
to_utc('2013-06-28 9:32AM'),
to_utc('2013-06-28 9:33AM'),
# NOTE: No update for 9:34
to_utc('2013-06-28 9:35AM'),
])
],
# Dictionary mapping spec_key -> list of expected outputs
'expected': dict(
# Build a dict from a list of tuples. Doing it this way because
# there are two distinct cases we want to test: forward-fillable
# fields and non-forward-fillable fields.
[
(
# Non forward-fill fields
key,
[
pd.DataFrame(
data={
1: [np.nan, np.nan, 0],
10: [np.nan, np.nan, 0],
},
index=[
to_utc('2013-06-27 3:59PM'),
to_utc('2013-06-27 4:00PM'),
to_utc('2013-06-28 9:31AM'),
],
# Missing volume data should manifest as 0's rather
# than nans.
).fillna(0 if 'volume' in key else np.nan),
pd.DataFrame(
data={
1: [np.nan, 0, 1],
10: [np.nan, 0, 10],
},
index=[
to_utc('2013-06-27 4:00PM'),
to_utc('2013-06-28 9:31AM'),
to_utc('2013-06-28 9:32AM'),
],
).fillna(0 if 'volume' in key else np.nan),
pd.DataFrame(
data={
1: [0, 1, 2],
10: [0, 10, 20],
},
index=[
to_utc('2013-06-28 9:31AM'),
to_utc('2013-06-28 9:32AM'),
to_utc('2013-06-28 9:33AM'),
],
# Note: Calling fillna() here even though there are
# no NaNs because this makes it less likely
# for us to introduce a stupid bug by
# copy/pasting in the future.
).fillna(0 if 'volume' in key else np.nan),
pd.DataFrame(
data={
1: [2, np.nan, 3],
10: [20, np.nan, 30],
},
index=[
to_utc('2013-06-28 9:33AM'),
to_utc('2013-06-28 9:34AM'),
to_utc('2013-06-28 9:35AM'),
],
).fillna(0 if 'volume' in key else np.nan),
],
)
for key in [spec.key_str for spec in MIXED_FIELDS_SPECS
if spec.field not in HistorySpec.FORWARD_FILLABLE]
]
+ # Concatenate the expected results for non-ffillable with
# expected result for ffillable.
[
(
# Forward-fillable fields
key,
[
pd.DataFrame(
data={
1: [np.nan, np.nan, 0],
10: [np.nan, np.nan, 0],
},
index=[
to_utc('2013-06-27 3:59PM'),
to_utc('2013-06-27 4:00PM'),
to_utc('2013-06-28 9:31AM'),
],
),
pd.DataFrame(
data={
1: [np.nan, 0, 1],
10: [np.nan, 0, 10],
},
index=[
to_utc('2013-06-27 4:00PM'),
to_utc('2013-06-28 9:31AM'),
to_utc('2013-06-28 9:32AM'),
],
),
pd.DataFrame(
data={
1: [0, 1, 2],
10: [0, 10, 20],
},
index=[
to_utc('2013-06-28 9:31AM'),
to_utc('2013-06-28 9:32AM'),
to_utc('2013-06-28 9:33AM'),
],
),
pd.DataFrame(
data={
1: [2, 2, 3],
10: [20, 20, 30],
},
index=[
to_utc('2013-06-28 9:33AM'),
to_utc('2013-06-28 9:34AM'),
to_utc('2013-06-28 9:35AM'),
],
),
],
)
for key in [spec.key_str for spec in MIXED_FIELDS_SPECS
if spec.field in HistorySpec.FORWARD_FILLABLE]
]
),
},
}
+181 -82
View File
@@ -40,6 +40,7 @@ from zipline.finance.blotter import Blotter
from zipline.gens.composites import date_sorted_sources
from zipline.finance import trading
from zipline.finance.trading import TradingEnvironment
from zipline.finance.execution import MarketOrder, LimitOrder
from zipline.finance.trading import SimulationParameters
@@ -80,88 +81,6 @@ class FinanceTestCase(TestCase):
self.assertTrue(trade.dt > prev.dt)
prev = trade
@timed(DEFAULT_TIMEOUT)
def test_trading_environment(self):
# holidays taken from: http://www.nyse.com/press/1191407641943.html
new_years = datetime(2008, 1, 1, tzinfo=pytz.utc)
mlk_day = datetime(2008, 1, 21, tzinfo=pytz.utc)
presidents = datetime(2008, 2, 18, tzinfo=pytz.utc)
good_friday = datetime(2008, 3, 21, tzinfo=pytz.utc)
memorial_day = datetime(2008, 5, 26, tzinfo=pytz.utc)
july_4th = datetime(2008, 7, 4, tzinfo=pytz.utc)
labor_day = datetime(2008, 9, 1, tzinfo=pytz.utc)
tgiving = datetime(2008, 11, 27, tzinfo=pytz.utc)
christmas = datetime(2008, 5, 25, tzinfo=pytz.utc)
a_saturday = datetime(2008, 8, 2, tzinfo=pytz.utc)
a_sunday = datetime(2008, 10, 12, tzinfo=pytz.utc)
holidays = [
new_years,
mlk_day,
presidents,
good_friday,
memorial_day,
july_4th,
labor_day,
tgiving,
christmas,
a_saturday,
a_sunday
]
for holiday in holidays:
self.assertTrue(not trading.environment.is_trading_day(holiday))
first_trading_day = datetime(2008, 1, 2, tzinfo=pytz.utc)
last_trading_day = datetime(2008, 12, 31, tzinfo=pytz.utc)
workdays = [first_trading_day, last_trading_day]
for workday in workdays:
self.assertTrue(trading.environment.is_trading_day(workday))
def test_simulation_parameters(self):
env = SimulationParameters(
period_start=datetime(2008, 1, 1, tzinfo=pytz.utc),
period_end=datetime(2008, 12, 31, tzinfo=pytz.utc),
capital_base=100000,
)
self.assertTrue(env.last_close.month == 12)
self.assertTrue(env.last_close.day == 31)
@timed(DEFAULT_TIMEOUT)
def test_sim_params_days_in_period(self):
# January 2008
# Su Mo Tu We Th Fr Sa
# 1 2 3 4 5
# 6 7 8 9 10 11 12
# 13 14 15 16 17 18 19
# 20 21 22 23 24 25 26
# 27 28 29 30 31
env = SimulationParameters(
period_start=datetime(2007, 12, 31, tzinfo=pytz.utc),
period_end=datetime(2008, 1, 7, tzinfo=pytz.utc),
capital_base=100000,
)
expected_trading_days = (
datetime(2007, 12, 31, tzinfo=pytz.utc),
# Skip new years
# holidays taken from: http://www.nyse.com/press/1191407641943.html
datetime(2008, 1, 2, tzinfo=pytz.utc),
datetime(2008, 1, 3, tzinfo=pytz.utc),
datetime(2008, 1, 4, tzinfo=pytz.utc),
# Skip Saturday
# Skip Sunday
datetime(2008, 1, 7, tzinfo=pytz.utc)
)
num_expected_trading_days = 5
self.assertEquals(num_expected_trading_days, env.days_in_period)
np.testing.assert_array_equal(expected_trading_days,
env.trading_days.tolist())
@timed(EXTENDED_TIMEOUT)
def test_full_zipline(self):
# provide enough trades to ensure all orders are filled.
@@ -429,3 +348,183 @@ class FinanceTestCase(TestCase):
self.assertEqual(300, fls_order['amount'])
self.assertEqual(3.33, fls_order['limit'])
self.assertEqual(2, fls_order['sid'])
class TradingEnvironmentTestCase(TestCase):
"""
Tests for date management utilities in zipline.finance.trading.
"""
def setUp(self):
setup_logger(self)
def tearDown(self):
teardown_logger(self)
@classmethod
def setUpClass(cls):
cls.env = TradingEnvironment()
@timed(DEFAULT_TIMEOUT)
def test_is_trading_day(self):
# holidays taken from: http://www.nyse.com/press/1191407641943.html
new_years = datetime(2008, 1, 1, tzinfo=pytz.utc)
mlk_day = datetime(2008, 1, 21, tzinfo=pytz.utc)
presidents = datetime(2008, 2, 18, tzinfo=pytz.utc)
good_friday = datetime(2008, 3, 21, tzinfo=pytz.utc)
memorial_day = datetime(2008, 5, 26, tzinfo=pytz.utc)
july_4th = datetime(2008, 7, 4, tzinfo=pytz.utc)
labor_day = datetime(2008, 9, 1, tzinfo=pytz.utc)
tgiving = datetime(2008, 11, 27, tzinfo=pytz.utc)
christmas = datetime(2008, 5, 25, tzinfo=pytz.utc)
a_saturday = datetime(2008, 8, 2, tzinfo=pytz.utc)
a_sunday = datetime(2008, 10, 12, tzinfo=pytz.utc)
holidays = [
new_years,
mlk_day,
presidents,
good_friday,
memorial_day,
july_4th,
labor_day,
tgiving,
christmas,
a_saturday,
a_sunday
]
for holiday in holidays:
self.assertTrue(not self.env.is_trading_day(holiday))
first_trading_day = datetime(2008, 1, 2, tzinfo=pytz.utc)
last_trading_day = datetime(2008, 12, 31, tzinfo=pytz.utc)
workdays = [first_trading_day, last_trading_day]
for workday in workdays:
self.assertTrue(self.env.is_trading_day(workday))
def test_simulation_parameters(self):
env = SimulationParameters(
period_start=datetime(2008, 1, 1, tzinfo=pytz.utc),
period_end=datetime(2008, 12, 31, tzinfo=pytz.utc),
capital_base=100000,
)
self.assertTrue(env.last_close.month == 12)
self.assertTrue(env.last_close.day == 31)
@timed(DEFAULT_TIMEOUT)
def test_sim_params_days_in_period(self):
# January 2008
# Su Mo Tu We Th Fr Sa
# 1 2 3 4 5
# 6 7 8 9 10 11 12
# 13 14 15 16 17 18 19
# 20 21 22 23 24 25 26
# 27 28 29 30 31
env = SimulationParameters(
period_start=datetime(2007, 12, 31, tzinfo=pytz.utc),
period_end=datetime(2008, 1, 7, tzinfo=pytz.utc),
capital_base=100000,
)
expected_trading_days = (
datetime(2007, 12, 31, tzinfo=pytz.utc),
# Skip new years
# holidays taken from: http://www.nyse.com/press/1191407641943.html
datetime(2008, 1, 2, tzinfo=pytz.utc),
datetime(2008, 1, 3, tzinfo=pytz.utc),
datetime(2008, 1, 4, tzinfo=pytz.utc),
# Skip Saturday
# Skip Sunday
datetime(2008, 1, 7, tzinfo=pytz.utc)
)
num_expected_trading_days = 5
self.assertEquals(num_expected_trading_days, env.days_in_period)
np.testing.assert_array_equal(expected_trading_days,
env.trading_days.tolist())
@timed(DEFAULT_TIMEOUT)
def test_market_minute_window(self):
# January 2008
# Su Mo Tu We Th Fr Sa
# 1 2 3 4 5
# 6 7 8 9 10 11 12
# 13 14 15 16 17 18 19
# 20 21 22 23 24 25 26
# 27 28 29 30 31
us_east = pytz.timezone('US/Eastern')
utc = pytz.utc
# 10:01 AM Eastern on January 7th..
start = us_east.localize(datetime(2008, 1, 7, 10, 1))
utc_start = start.astimezone(utc)
# Get the next 10 minutes
minutes = self.env.market_minute_window(
utc_start, 10,
)
self.assertEqual(len(minutes), 10)
for i in range(10):
self.assertEqual(minutes[i], utc_start + timedelta(minutes=i))
# Get the previous 10 minutes.
minutes = self.env.market_minute_window(
utc_start, 10, step=-1,
)
self.assertEqual(len(minutes), 10)
for i in range(10):
self.assertEqual(minutes[i], utc_start + timedelta(minutes=-i))
# Get the next 900 minutes, including utc_start, rolling over into the
# next two days.
# Should include:
# Today: 10:01 AM -> 4:00 PM (360 minutes)
# Tomorrow: 9:31 AM -> 4:00 PM (390 minutes, 750 total)
# Last Day: 9:31 AM -> 12:00 PM (150 minutes, 900 total)
minutes = self.env.market_minute_window(
utc_start, 900,
)
today = self.env.market_minutes_for_day(start)[30:]
tomorrow = self.env.market_minutes_for_day(
start + timedelta(days=1)
)
last_day = self.env.market_minutes_for_day(
start + timedelta(days=2))[:150]
self.assertEqual(len(minutes), 900)
self.assertEqual(minutes[0], utc_start)
self.assertTrue(all(today == minutes[:360]))
self.assertTrue(all(tomorrow == minutes[360:750]))
self.assertTrue(all(last_day == minutes[750:]))
# Get the previous 801 minutes, including utc_start, rolling over into
# Friday the 4th and Thursday the 3rd.
# Should include:
# Today: 10:01 AM -> 9:31 AM (31 minutes)
# Friday: 4:00 PM -> 9:31 AM (390 minutes, 421 total)
# Thursday: 4:00 PM -> 9:41 AM (380 minutes, 801 total)
minutes = self.env.market_minute_window(
utc_start, 801, step=-1,
)
today = self.env.market_minutes_for_day(start)[30::-1]
# minus an extra two days from each of these to account for the two
# weekend days we skipped
friday = self.env.market_minutes_for_day(
start + timedelta(days=-3),
)[::-1]
thursday = self.env.market_minutes_for_day(
start + timedelta(days=-4),
)[:9:-1]
self.assertEqual(len(minutes), 801)
self.assertEqual(minutes[0], utc_start)
self.assertTrue(all(today == minutes[:31]))
self.assertTrue(all(friday == minutes[31:421]))
self.assertTrue(all(thursday == minutes[421:]))
+86 -14
View File
@@ -24,10 +24,14 @@ from zipline.history.history_container import HistoryContainer
from zipline.protocol import BarData
import zipline.utils.factory as factory
from zipline import TradingAlgorithm
from zipline.finance.trading import SimulationParameters
from zipline.finance.trading import SimulationParameters, TradingEnvironment
from zipline.sources import RandomWalkSource
from .history_cases import (
HISTORY_CONTAINER_TEST_CASES,
)
# Cases are over the July 4th holiday, to ensure use of trading calendar.
# March 2013
@@ -73,7 +77,7 @@ from zipline.sources import RandomWalkSource
# Times to be converted via:
# pd.Timestamp('2013-07-05 9:31', tz='US/Eastern').tz_convert('UTC')},
MINUTE_CASES_RAW = {
INDEX_TEST_CASES_RAW = {
'week of daily data': {
'input': {'bar_count': 5,
'frequency': '1d',
@@ -86,6 +90,18 @@ MINUTE_CASES_RAW = {
'2013-07-05 9:31AM',
]
},
'five minutes on july 5th open': {
'input': {'bar_count': 5,
'frequency': '1m',
'algo_dt': '2013-07-05 9:31AM'},
'expected': [
'2013-07-03 12:57PM',
'2013-07-03 12:58PM',
'2013-07-03 12:59PM',
'2013-07-03 1:00PM',
'2013-07-05 9:31AM',
]
},
}
@@ -104,28 +120,31 @@ def convert_cases(cases):
in case['expected']])
return cases
MINUTE_CASES = convert_cases(MINUTE_CASES_RAW)
INDEX_TEST_CASES = convert_cases(INDEX_TEST_CASES_RAW)
def index_at_dt(case_input):
def get_index_at_dt(case_input):
history_spec = history.HistorySpec(
case_input['bar_count'],
case_input['frequency'],
None,
False
)
return history.index_at_dt(history_spec,
case_input['algo_dt'])
return history.index_at_dt(history_spec, case_input['algo_dt'])
class TestHistoryIndex(TestCase):
@classmethod
def setUpClass(cls):
cls.environment = TradingEnvironment.instance()
@parameterized.expand(
[(name, case['input'], case['expected'])
for name, case in MINUTE_CASES.items()]
for name, case in INDEX_TEST_CASES.items()]
)
def test_index_at_dt(self, name, case_input, expected):
history_index = index_at_dt(case_input)
history_index = get_index_at_dt(case_input)
history_series = pd.Series(index=history_index)
expected_series = pd.Series(index=expected)
@@ -135,9 +154,64 @@ class TestHistoryIndex(TestCase):
class TestHistoryContainer(TestCase):
@classmethod
def setUpClass(cls):
cls.env = TradingEnvironment.instance()
def bar_data_dt(self, bar_data, require_unique=True):
"""
Get a dt to associate with the given BarData object.
If require_unique == True, throw an error if multiple unique dt's are
encountered. Otherwise, return the earliest dt encountered.
"""
dts = {sid_data['dt'] for sid_data in bar_data.values()}
if require_unique and len(dts) > 1:
self.fail("Multiple unique dts ({0}) in {1}".format(dts, bar_data))
return sorted(dts)[0]
@parameterized.expand(
[(name,
case['specs'],
case['sids'],
case['dt'],
case['updates'],
case['expected'])
for name, case in HISTORY_CONTAINER_TEST_CASES.items()]
)
def test_history_container(self,
name,
specs,
sids,
dt,
updates,
expected):
for spec in specs:
# Sanity check on test input.
self.assertEqual(len(expected[spec.key_str]), len(updates))
container = HistoryContainer(
{spec.key_str: spec for spec in specs}, sids, dt
)
for update_count, update in enumerate(updates):
bar_dt = self.bar_data_dt(update)
container.update(update, bar_dt)
for spec in specs:
pd.util.testing.assert_frame_equal(
container.get_history(spec, bar_dt),
expected[spec.key_str][update_count],
check_dtype=False,
check_column_type=True,
check_index_type=True,
check_frame_type=True,
)
def test_container_nans_and_daily_roll(self):
# set up trading environment
factory.create_simulation_parameters(num_days=4)
spec = history.HistorySpec(
bar_count=3,
@@ -145,7 +219,7 @@ class TestHistoryContainer(TestCase):
field='price',
ffill=True
)
specs = {hash(spec): spec}
specs = {spec.key_str: spec}
initial_sids = [1, ]
initial_dt = pd.Timestamp(
'2013-06-28 9:31AM', tz='US/Eastern').tz_convert('UTC')
@@ -154,7 +228,7 @@ class TestHistoryContainer(TestCase):
specs, initial_sids, initial_dt)
bar_data = BarData()
container.update(bar_data, initial_dt)
# Since there was no backfill because of no db.
# And no first bar of data, so all values should be nans.
prices = container.get_history(spec, initial_dt)
@@ -169,7 +243,6 @@ class TestHistoryContainer(TestCase):
'price': 10,
'dt': second_bar_dt
}
container.update(bar_data, second_bar_dt)
prices = container.get_history(spec, second_bar_dt)
@@ -288,7 +361,6 @@ def handle_data(context, data):
# 12 13 14 15 16 17 18
# 19 20 21 22 23 24 25
# 26 27 28 29 30 31
start = pd.Timestamp('2006-03-20', tz='UTC')
end = pd.Timestamp('2006-03-21', tz='UTC')
+120 -2
View File
@@ -18,6 +18,7 @@ import logbook
import datetime
import pandas as pd
import numpy as np
from zipline.data.loader import load_market_data
from zipline.utils import tradingcalendar
@@ -174,11 +175,39 @@ class TradingEnvironment(object):
return None
def previous_trading_day(self, test_date):
dt = self.normalize_date(test_date)
delta = datetime.timedelta(days=-1)
while self.first_trading_day < test_date:
dt += delta
if dt in self.trading_days:
return dt
return None
def days_in_range(self, start, end):
mask = ((self.trading_days >= start) &
(self.trading_days <= end))
return self.trading_days[mask]
def minutes_for_days_in_range(self, start, end):
"""
Get all market minutes for the days between start and end, inclusive.
"""
start_date = self.normalize_date(start)
end_date = self.normalize_date(end)
all_minutes = []
for day in self.days_in_range(start_date, end_date):
day_minutes = self.market_minutes_for_day(day)
all_minutes.append(day_minutes)
# Concatenate all minutes and truncate minutes before start/after end.
return pd.DatetimeIndex(
np.concatenate(all_minutes), copy=False, tz='UTC',
)
def next_open_and_close(self, start_date):
"""
Given the start_date, returns the next open and close of
@@ -193,15 +222,104 @@ Last successful date: %s" % self.last_trading_day)
return self.get_open_and_close(next_open)
def previous_open_and_close(self, start_date):
"""
Given the start_date, returns the previous open and close of the
market.
"""
previous = self.previous_trading_day(start_date)
if previous is None:
raise NoFurtherDataError(
"Attempt to backtest beyond available history. "
"First successful date: %s" % self.first_trading_day)
return self.get_open_and_close(previous)
def next_market_minute(self, start):
"""
Get the next market minute after @start. This is either the immediate
next minute, or the open of the next market day after start.
"""
next_minute = start + datetime.timedelta(minutes=1)
if self.is_market_hours(next_minute):
return next_minute
return self.next_open_and_close(start)[0]
def previous_market_minute(self, start):
"""
Get the next market minute before @start. This is either the immediate
previous minute, or the close of the market day before start.
"""
prev_minute = start - datetime.timedelta(minutes=1)
if self.is_market_hours(prev_minute):
return prev_minute
return self.previous_open_and_close(start)[1]
def get_open_and_close(self, day):
todays_minutes = self.open_and_closes.ix[day.date()]
return todays_minutes['market_open'], todays_minutes['market_close']
def market_minutes_for_day(self, midnight):
market_open, market_close = self.get_open_and_close(midnight)
def market_minutes_for_day(self, stamp):
market_open, market_close = self.get_open_and_close(stamp)
return pd.date_range(market_open, market_close, freq='T')
def open_close_window(self, start, count, offset=0, step=1):
"""
Return a DataFrame containing `count` market opens and closes,
beginning with `start` + `offset` days and continuing `step` minutes at
a time.
"""
# TODO: Correctly handle end of data.
start_idx = self.get_index(start) + offset
stop_idx = start_idx + (count * step)
index = np.arange(start_idx, stop_idx, step)
return self.open_and_closes.iloc[index]
def market_minute_window(self, start, count, step=1):
"""
Return a DatetimeIndex containing `count` market minutes, starting with
`start` and continuing `step` minutes at a time.
"""
if not self.is_market_hours(start):
raise ValueError("market_minute_window starting at "
"non-market time {minute}".format(minute=start))
all_minutes = []
current_day_minutes = self.market_minutes_for_day(start)
first_minute_idx = current_day_minutes.searchsorted(start)
minutes_in_range = current_day_minutes[first_minute_idx::step]
# Build up list of lists of days' market minutes until we have count
# minutes stored altogether.
while True:
if len(minutes_in_range) >= count:
# Truncate off extra minutes
minutes_in_range = minutes_in_range[:count]
all_minutes.append(minutes_in_range)
count -= len(minutes_in_range)
if count <= 0:
break
if step > 0:
start, _ = self.next_open_and_close(start)
current_day_minutes = self.market_minutes_for_day(start)
else:
_, start = self.previous_open_and_close(start)
current_day_minutes = self.market_minutes_for_day(start)
minutes_in_range = current_day_minutes[::step]
# Concatenate all the accumulated minutes.
return pd.DatetimeIndex(
np.concatenate(all_minutes), copy=False, tz='UTC',
)
def trading_day_distance(self, first_date, second_date):
first_date = self.normalize_date(first_date)
second_date = self.normalize_date(second_date)
+221 -44
View File
@@ -32,19 +32,183 @@ class Frequency(object):
Represents how the data is sampled, as specified by the algoscript
via units like "1d", "1m", etc.
Currently only one frequency is supported, "1d"
"1d" provides data keyed by closing, and the last minute of the current
day.
Currently only two frequencies are supported, "1d" and "1m"
- "1d" provides data at daily frequency, with the latest bar aggregating
the elapsed minutes of the (incomplete) current day
- "1m" provides data at minute frequency
"""
SUPPORTED_FREQUENCIES = frozenset({'1d', '1m'})
MAX_MINUTES = {'m': 1, 'd': 390}
def __init__(self, freq_str):
if freq_str not in self.SUPPORTED_FREQUENCIES:
raise ValueError(
"history frequency must be in {supported}".format(
supported=self.SUPPORTED_FREQUENCIES,
))
# The string the at the algoscript specifies.
# Hold onto to use a key for caching.
self.freq_str = freq_str
# num - The number of units of the frequency.
# unit_str - The unit type, e.g. 'd'
self.num, self.unit_str = parse_freq_str(freq_str)
def next_window_start(self, previous_window_close):
"""
Get the first minute of the window starting after a window that
finished on @previous_window_close.
"""
if self.unit_str == 'd':
return self.next_day_window_start(previous_window_close)
elif self.unit_str == 'm':
return self.next_minute_window_start(previous_window_close)
@staticmethod
def next_day_window_start(previous_window_close):
"""
Get the next day window start after @previous_window_close. This is
defined as the first market open strictly greater than
@previous_window_close.
"""
env = trading.environment
next_open, _ = env.next_open_and_close(previous_window_close)
return next_open
@staticmethod
def next_minute_window_start(previous_window_close):
"""
Get the next minute window start after @previous_window_close. This is
defined as the first market minute strictly greater than
@previous_window_close.
"""
env = trading.environment
return env.next_market_minute(previous_window_close)
def window_open(self, window_close):
"""
For a period ending on `window_end`, calculate the date of the first
minute bar that should be used to roll a digest for this frequency.
"""
if self.unit_str == 'd':
return self.day_window_open(window_close, self.num)
elif self.unit_str == 'm':
return self.minute_window_open(window_close, self.num)
def window_close(self, window_start):
"""
For a period starting on `window_start`, calculate the date of the last
minute bar that should be used to roll a digest for this frequency.
"""
if self.unit_str == 'd':
return self.day_window_close(window_start, self.num)
elif self.unit_str == 'm':
return self.minute_window_close(window_start, self.num)
@staticmethod
def day_window_open(window_close, num_days):
"""
Get the first minute for a daily window of length @num_days with last
minute @window_close. This is calculated by searching backward until
@num_days market_closes are encountered.
"""
env = trading.environment
open_ = env.open_close_window(
window_close,
1,
offset=-(num_days - 1)
).market_open.iloc[0]
return open_
@staticmethod
def minute_window_open(window_close, num_minutes):
"""
Get the first minute for a minutely window of length @num_minutes with
last minute @window_close.
This is defined as window_close if num_minutes == 1, and otherwise as
the N-1st market minute after @window_start.
"""
if num_minutes == 1:
# Short circuit this case.
return window_close
env = trading.environment
return env.market_minute_window(window_close, count=-num_minutes)[-1]
@staticmethod
def day_window_close(window_start, num_days):
"""
Get the last minute for a daily window of length @num_days with first
minute @window_start. This is calculated by searching forward until
@num_days market closes are encountered.
Examples:
window_start = Thursday March 2nd, 2006, 9:31 AM EST
num_days = 1
--> window_close = Thursday March 2nd, 2006, 4:00 PM EST
window_start = Thursday March 2nd, 2006, 3:59 AM EST
num_days = 1
--> window_close = Thursday March 2nd, 2006, 4:00 PM EST
window_start = Thursday March 2nd, 2006, 9:31 AM EST
num_days = 2
--> window_close = Friday March 2nd, 2006, 4:00 PM EST
window_start = Thursday March 2nd, 2006, 9:31 AM EST
num_days = 3
--> window_close = Monday March 6th, 2006, 4:00 PM EST
# Day before July 4th is an early close
window_start = Wednesday July 3rd, 2013, 9:31 AM EST
num_days = 1
--> window_close = Wednesday July 3rd, 2013, 1:00 PM EST
"""
env = trading.environment
close = env.open_close_window(
window_start,
1,
offset=num_days - 1
).market_close.iloc[0]
return close
@staticmethod
def minute_window_close(window_start, num_minutes):
"""
Get the last minute for a minutely window of length @num_minutes with
first minute @window_start.
This is defined as window_start if num_minutes == 1, and otherwise as
the N-1st market minute after @window_start.
"""
if num_minutes == 1:
# Short circuit this case.
return window_start
env = trading.environment
return env.market_minute_window(window_start, count=num_minutes)[-1]
@property
def max_minutes(self):
"""
The maximum number of minutes required to roll a bar at this frequency.
"""
return self.MAX_MINUTES[self.unit_str] * self.num
def __eq__(self, other):
return self.freq_str == other.freq_str
def __hash__(self):
return hash(self.freq_str)
def __repr__(self):
return ''.join([str(self.__class__.__name__),
"('", self.freq_str, "')"])
class HistorySpec(object):
"""
@@ -55,6 +219,8 @@ class HistorySpec(object):
result frames.
"""
FORWARD_FILLABLE = frozenset({'price'})
@classmethod
def spec_key(cls, bar_count, freq_str, field, ffill):
"""
@@ -73,63 +239,74 @@ class HistorySpec(object):
# The field, e.g. 'price', 'volume', etc.
self.field = field
# Whether or not to forward fill the nan data.
self.ffill = ffill
# How many trading days the spec needs to look back.
# Used by index creation to see how large of an overarching window
# is needed.
self.days_needed = calculate_days_needed(
self.bar_count, self.frequency)
self._ffill = ffill
# Calculate the cache key string once.
self.key_str = self.spec_key(
bar_count, frequency.freq_str, field, ffill)
@property
def ffill(self):
"""
Wrapper around ffill that returns False for fields which are not
forward-fillable.
"""
return self._ffill and self.field in self.FORWARD_FILLABLE
def calculate_days_needed(bar_count, freq):
""" Returns number trading days needed.
Overshoots so that we more than enough to sample from the current
frequency slot plus previous ones.
def __repr__(self):
return ''.join([self.__class__.__name__, "('", self.key_str, "')"])
def days_index_at_dt(history_spec, algo_dt):
"""
if freq.unit_str == 'd':
return bar_count * freq.num
def days_index_at_dt(days_needed, algo_dt):
"""
The timestamps of previous days closes with the size of @days_needed
at @algo_dt.
Get the index of a frame to be used for a get_history call with daily
frequency.
"""
env = trading.environment
# Get the previous (bar_count - 1) days' worth of market closes.
day_delta = (history_spec.bar_count - 1) * history_spec.frequency.num
market_closes = env.open_close_window(
algo_dt,
day_delta,
offset=(-day_delta),
step=history_spec.frequency.num,
).market_close
latest_algo_dt = algo_dt
current_index = env.open_and_closes.index.searchsorted(algo_dt.date())
previous_days_num = days_needed - 1
previous_days = env.open_and_closes['market_close'][
current_index - previous_days_num:current_index]
# Append the current algo_dt as the last index value.
# Using the 'rawer' numpy array values here because of a bottleneck
# that appeared when using DatetimeIndex
return np.append(previous_days.values, latest_algo_dt)
return np.append(market_closes.values, algo_dt)
def minutes_index_at_dt(history_spec, algo_dt):
"""
Get the index of a frame to be used for a get_history_call with minutely
frequency.
"""
# TODO: This is almost certainly going to be too slow for production.
env = trading.environment
return env.market_minute_window(
algo_dt,
history_spec.bar_count,
step=-1,
)[::-1]
def index_at_dt(history_spec, algo_dt):
"""
The index, including @algo_dt at the given @algo_dt for the count
and frequency of the @history_spec.
Returns index of a frame returned by get_history() with the given
history_spec and algo_dt.
The resulting index `@history_spec.bar_count` bars, increasing in units of
`@history_spec.frequency`, terminating at the given @algo_dt.
Note: The last bar of the returned frame represents an as-of-yet incomplete
time window, so the delta between the last and second-to-last bars is
usually always less than `@history_spec.frequency` for frequencies greater
than 1m.
"""
days_index = days_index_at_dt(history_spec.days_needed, algo_dt)
frequency = history_spec.frequency
if frequency.unit_str == 'd':
index_of_algo_dt = days_index.searchsorted(algo_dt)
start_index = index_of_algo_dt + 1 - history_spec.bar_count
end_index = index_of_algo_dt + 1
return days_index[start_index:end_index]
return days_index_at_dt(history_spec, algo_dt)
elif frequency.unit_str == 'm':
return minutes_index_at_dt(history_spec, algo_dt)
+345 -149
View File
@@ -12,56 +12,124 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from itertools import groupby
import numpy as np
import pandas as pd
from six import itervalues
from six import itervalues, iteritems, iterkeys
from . history import (
index_at_dt,
days_index_at_dt,
)
from zipline.finance import trading
from zipline.utils.data import RollingPanel
# The closing price is referred to by multiple names,
# allow both for price rollover logic etc.
CLOSING_PRICE_FIELDS = {'price', 'close_price'}
CLOSING_PRICE_FIELDS = frozenset({'price', 'close_price'})
def create_initial_day_panel(days_needed, fields, sids, dt):
index = days_index_at_dt(days_needed, dt)
# Use original index in case of 1 bar.
if days_needed != 1:
index = index[:-1]
window = len(index)
rp = RollingPanel(window, fields, sids)
for i, day in enumerate(index):
rp.index_buf[i] = day
rp.pos = window
return rp
def ffill_buffer_from_prior_values(field,
buffer_frame,
digest_frame,
pre_digest_values):
"""
Forward-fill a buffer frame, falling back to the end-of-period values of a
digest frame if the buffer frame has leading NaNs.
"""
# Get values which are NaN at the beginning of the period.
first_bar = buffer_frame.iloc[0]
def iter_nan_sids():
"""
Helper for iterating over the remaining nan sids in first_bar.
"""
return (sid for sid in first_bar[first_bar.isnull()].index)
# Try to fill with the last entry from the digest frame.
if digest_frame is not None:
# We don't store a digest frame for frequencies that only have a bar
# count of 1.
for sid in iter_nan_sids():
buffer_frame[sid][0] = digest_frame.ix[-1, sid]
# If we still have nan sids, try to fill with pre_digest_values.
for sid in iter_nan_sids():
prior_sid_value = pre_digest_values[field].get(sid)
if prior_sid_value:
# If the prior value is greater than the timestamp of our first
# bar.
if prior_sid_value.get('dt', first_bar.name) > first_bar.name:
buffer_frame[sid][0] = prior_sid_value.get('value', np.nan)
return buffer_frame.ffill()
def create_current_day_panel(fields, sids, dt):
# Can't use open_and_close since need to create enough space for a full
# day, even on a half day.
# Can now use mkt open and close, since we don't roll
env = trading.environment
index = env.market_minutes_for_day(dt)
return pd.Panel(items=fields, minor_axis=sids, major_axis=index)
def ffill_digest_frame_from_prior_values(field, digest_frame, prior_values):
"""
Forward-fill a digest frame, falling back to the last known priof values if
necessary.
"""
if digest_frame is not None:
# Digest frame is None in the case that we only have length 1 history
# specs for a given frequency.
# It's possible that the first bar in our digest frame is storing NaN
# values. If so, check if we've tracked an older value and use that as
# an ffill value for the first bar.
first_bar = digest_frame.ix[0]
nan_sids = first_bar[first_bar.isnull()].index
for sid in nan_sids:
try:
# Only use prior value if it is before the index,
# so that a backfill does not accidentally occur.
if prior_values[field][sid]['dt'] <= digest_frame.index[0]:
digest_frame[sid][0] = prior_values[field][sid]['value']
except KeyError:
# Allow case where there is no previous value.
# e.g. with leading nans.
pass
digest_frame = digest_frame.ffill()
return digest_frame
def ffill_day_frame(field, day_frame, prior_day_frame):
# get values which are nan-at the beginning of the day
# and attempt to fill with the last close
first_bar = day_frame.ix[0]
nan_sids = first_bar[np.isnan(first_bar)]
for sid, _ in nan_sids.iterkv():
day_frame[sid][0] = prior_day_frame.ix[-1, sid]
if field != 'volume':
day_frame = day_frame.ffill()
return day_frame
def freq_str_and_bar_count(history_spec):
"""
Helper for getting the frequency string from a history spec.
"""
return (history_spec.frequency.freq_str, history_spec.bar_count)
def group_by_frequency(history_specs):
"""
Takes an iterable of history specs and returns a dictionary mapping unique
frequencies to a list of specs with that frequency.
Within each list, the HistorySpecs are sorted by ascending bar count.
Example:
[HistorySpec(3, '1d', 'price', True),
HistorySpec(2, '2d', 'open', True),
HistorySpec(2, '1d', 'open', False),
HistorySpec(5, '1m', 'open', True)]
yields
{Frequency('1d') : [HistorySpec(2, '1d', 'open', False)],
HistorySpec(3, '1d', 'price', True),
Frequency('2d') : [HistorySpec(2, '2d', 'open', True)],
Frequency('1m') : [HistorySpec(5, '1m', 'open', True)]}
"""
return {key: list(group)
for key, group in groupby(
sorted(history_specs, key=freq_str_and_bar_count),
key=lambda spec: spec.frequency)}
class HistoryContainer(object):
@@ -78,35 +146,105 @@ class HistoryContainer(object):
# History specs to be served by this container.
self.history_specs = history_specs
# The overaching panel needs to be large enough to contain the
# largest history spec
self.max_days_needed = max(spec.days_needed for spec
in itervalues(history_specs))
self.frequency_groups = \
group_by_frequency(itervalues(self.history_specs))
# The set of fields specified by all history specs
self.fields = set(spec.field for spec in itervalues(history_specs))
self.prior_day_panel = create_initial_day_panel(
self.max_days_needed, self.fields, initial_sids, initial_dt)
# This panel contains raw minutes for periods that haven't been fully
# completed. When a frequency period rolls over, these minutes are
# digested using some sort of aggregation call on the panel (e.g. `sum`
# for volume, `max` for high, `min` for low, etc.).
self.buffer_panel = self.create_buffer_panel(
initial_sids,
initial_dt,
)
# This panel contains the minutes for the current day.
# The value that is used is some sort of aggregation call on the
# panel, e.g. `sum` for volume, `max` for high, etc.
self.current_day_panel = create_current_day_panel(
self.fields, initial_sids, initial_dt)
# Dictionaries with Frequency objects as keys.
self.digest_panels, self.cur_window_starts, self.cur_window_closes = \
self.create_digest_panels(initial_sids, initial_dt)
# Populating initial frames here, so that the cost of creating the
# initial frames does not show up when profiling. These frames are
# cached since mid-stream creation of containing data frames on every
# bar is expensive.
self.create_return_frames(initial_dt)
# Helps prop up the prior day panel against having a nan, when the data
# has been seen.
self.last_known_prior_values = {field: {} for field in self.fields}
# Populating initial frames here, so that the cost of creating the
# initial frames does not show up when profiling get_y
# These frames are cached since mid-stream creation of containing
# data frames on every bar is expensive.
self.return_frames = {}
@property
def unique_frequencies(self):
"""
Return an iterator over all the unique frequencies serviced by this
container.
"""
return iterkeys(self.frequency_groups)
self.create_return_frames(initial_dt)
def create_digest_panels(self, initial_sids, initial_dt):
"""
Initialize a RollingPanel for each unique panel frequency being stored
by this container. Each RollingPanel pre-allocates enough storage
space to service the highest bar-count of any history call that it
serves.
Relies on the fact that group_by_frequency sorts the value lists by
ascending bar count.
"""
# Map from frequency -> first/last minute of the next digest to be
# rolled for that frequency.
first_window_starts = {}
first_window_closes = {}
# Map from frequency -> digest_panels.
panels = {}
for freq, specs in iteritems(self.frequency_groups):
# Relying on the sorting of group_by_frequency to get the spec
# requiring the largest number of bars.
largest_spec = specs[-1]
if largest_spec.bar_count == 1:
# No need to allocate a digest panel; this frequency will only
# ever use data drawn from self.buffer_panel.
env = trading.environment
first_window_closes[freq] = \
env.get_open_and_close(initial_dt)[1]
first_window_starts[freq] = \
freq.window_open(first_window_closes[freq])
continue
initial_dates = index_at_dt(largest_spec, initial_dt)
# Set up dates for our first digest roll, which is keyed to the
# close of the first entry in our initial index.
first_window_closes[freq] = initial_dates[0]
first_window_starts[freq] = freq.window_open(initial_dates[0])
rp = RollingPanel(len(initial_dates) - 1,
self.fields,
initial_sids)
panels[freq] = rp
return panels, first_window_starts, first_window_closes
def create_buffer_panel(self, initial_sids, initial_dt):
"""
Initialize a RollingPanel containing enough minutes to service all our
frequencies.
"""
max_bars_needed = max(freq.max_minutes
for freq in self.unique_frequencies)
rp = RollingPanel(
max_bars_needed,
self.fields,
initial_sids,
# Restrict the initial data down to just the fields being used in
# this container.
)
return rp
def create_return_frames(self, algo_dt):
"""
@@ -114,101 +252,158 @@ class HistoryContainer(object):
Called during init and at universe rollovers.
"""
for history_spec in itervalues(self.history_specs):
index = index_at_dt(history_spec, algo_dt)
index = pd.to_datetime(index)
self.return_frames = {}
for spec_key, history_spec in iteritems(self.history_specs):
index = pd.to_datetime(index_at_dt(history_spec, algo_dt))
frame = pd.DataFrame(
index=index,
columns=map(int, self.current_day_panel.minor_axis.values),
columns=map(int, self.buffer_panel.minor_axis.values),
dtype=np.float64)
self.return_frames[history_spec] = frame
self.return_frames[spec_key] = frame
def buffer_panel_minutes(self,
buffer_panel=None,
earliest_minute=None,
latest_minute=None):
"""
Get the minutes in @buffer_panel between @earliest_minute and
@last_minute, inclusive.
@buffer_panel can be a RollingPanel or a plain Panel. If a
RollingPanel is supplied, we call `get_current` to extract a Panel
object. If no panel is supplied, we use self.buffer_panel.
If no value is specified for @earliest_minute, use all the minutes we
have up until @latest minute.
If no value for @latest_minute is specified, use all values up until
the latest minute.
"""
buffer_panel = buffer_panel or self.buffer_panel
if isinstance(buffer_panel, RollingPanel):
buffer_panel = buffer_panel.get_current()
return buffer_panel.ix[:, earliest_minute:latest_minute, :]
def update(self, data, algo_dt):
"""
Takes the bar at @algo_dt's @data and adds to the current day panel.
Takes the bar at @algo_dt's @data, checks to see if we need to roll any
new digests, then adds new data to the buffer panel.
"""
self.check_and_roll(algo_dt)
self.update_digest_panels(algo_dt, self.buffer_panel)
fields = self.fields
field_data = {sid: {field: bar[field] for field in fields}
for sid, bar in data.iteritems()
if (bar
and
bar['dt'] == algo_dt
and
# Only use data which is keyed in the data panel.
# Prevents crashes due to custom data.
sid in self.current_day_panel.minor_axis)}
field_frame = pd.DataFrame(field_data)
self.current_day_panel.ix[:, algo_dt, :] = field_frame.T
frame = pd.DataFrame(
{sid: {field: bar[field] for field in fields}
for sid, bar in data.iteritems()
if (bar
and
bar['dt'] == algo_dt
and
# Only use data which is keyed in the data panel.
# Prevents crashes due to custom data.
sid in self.buffer_panel.minor_axis)})
self.buffer_panel.add_frame(algo_dt, frame)
def update_digest_panels(self, algo_dt, buffer_panel):
"""
Check whether @algo_dt is greater than cur_window_close for any of our
frequencies. If so, roll a digest for that frequency using data drawn
from @buffer panel and insert it into the appropriate digest panels.
"""
for frequency in self.unique_frequencies:
# We don't keep a digest panel if we only have a length-1 history
# spec for a given frequency
digest_panel = self.digest_panels.get(frequency, None)
while algo_dt > self.cur_window_closes[frequency]:
earliest_minute = self.cur_window_starts[frequency]
latest_minute = self.cur_window_closes[frequency]
minutes_to_process = self.buffer_panel_minutes(
buffer_panel,
earliest_minute=earliest_minute,
latest_minute=latest_minute,
)
# Create a digest from minutes_to_process and add it to
# digest_panel.
self.roll(frequency,
digest_panel,
minutes_to_process,
latest_minute)
# Update panel start/close for this frequency.
self.cur_window_starts[frequency] = \
frequency.next_window_start(latest_minute)
self.cur_window_closes[frequency] = \
frequency.window_close(self.cur_window_starts[frequency])
def roll(self, frequency, digest_panel, buffer_minutes, digest_dt):
"""
Package up minutes in @buffer_minutes insert that bar into
@digest_panel at index @last_minute, and update
self.cur_window_{starts|closes} for the given frequency.
"""
if digest_panel is None:
# This happens if the only spec we have at this frequency has a bar
# count of 1.
return
def roll(self, roll_dt):
env = trading.environment
# This should work for price, but not others, e.g.
# open.
# Get the most recent value.
rolled = pd.DataFrame(
index=self.current_day_panel.items,
columns=self.current_day_panel.minor_axis)
index=self.fields,
columns=buffer_minutes.minor_axis)
for field in self.fields:
if field in CLOSING_PRICE_FIELDS:
# Use the last price.
prices = self.current_day_panel.ffill().ix[field, -1, :]
# Use the last close, or NaN if we have no minutes.
try:
prices = buffer_minutes.loc[field].ffill().iloc[-1]
except IndexError:
# Scalar assignment sets the value for all entries.
prices = np.nan
rolled.ix[field] = prices
elif field == 'open_price':
# Use the first price.
opens = self.current_day_panel.ix['open_price', 0, :]
# Use the first open, or NaN if we have no minutes.
try:
opens = buffer_minutes.loc[field].bfill().iloc[0]
except IndexError:
# Scalar assignment sets the value for all entries.
opens = np.nan
rolled.ix['open_price'] = opens
elif field == 'volume':
# Volume is the sum of the volumes during the
# course of the day
volumes = self.current_day_panel.ix['volume'].apply(np.sum)
# course of the period.
volumes = buffer_minutes.ix['volume'].sum().fillna(0)
rolled.ix['volume'] = volumes
elif field == 'high':
# Use the highest high.
highs = self.current_day_panel.ix['high'].apply(np.max)
highs = buffer_minutes.ix['high'].max()
rolled.ix['high'] = highs
elif field == 'low':
# Use the lowest low.
lows = self.current_day_panel.ix['low'].apply(np.min)
lows = buffer_minutes.ix['low'].min()
rolled.ix['low'] = lows
for sid, value in rolled.ix[field].iterkv():
if not np.isnan(value):
try:
prior_values = self.last_known_prior_values[field][sid]
prior_values = \
self.last_known_prior_values[field][sid]
except KeyError:
prior_values = {}
self.last_known_prior_values[field][sid] = prior_values
prior_values['dt'] = roll_dt
self.last_known_prior_values[field][sid] = \
prior_values
prior_values['dt'] = digest_dt
prior_values['value'] = value
self.prior_day_panel.add_frame(roll_dt, rolled)
# Create a new 'current day' collector.
next_day = env.next_trading_day(roll_dt)
if next_day:
# Only create the next panel if there is a next day.
# i.e. don't create the next panel on the last day of
# the backest/current day of live trading.
self.current_day_panel = create_current_day_panel(
self.fields,
# Will break on quarter rollover.
self.current_day_panel.minor_axis,
next_day)
def check_and_roll(self, algo_dt):
"""
Check whether the algo_dt is at the end of a day.
If it is, aggregate the day's minute data and store it in the prior
day panel.
"""
# Use a while loop to account for illiquid bars.
while algo_dt > self.current_day_panel.major_axis[-1]:
roll_dt = self.current_day_panel.major_axis[-1]
self.roll(roll_dt)
digest_panel.add_frame(digest_dt, rolled)
def get_history(self, history_spec, algo_dt):
"""
@@ -217,57 +412,58 @@ class HistoryContainer(object):
Selects from the overarching history panel the values for the
@history_spec at the given @algo_dt.
"""
field = history_spec.field
bar_count = history_spec.bar_count
do_ffill = history_spec.ffill
index = index_at_dt(history_spec, algo_dt)
index = pd.to_datetime(index)
index = pd.to_datetime(index_at_dt(history_spec, algo_dt))
return_frame = self.return_frames[history_spec.key_str]
frame = self.return_frames[history_spec]
# Overwrite the index.
# Not worrying about values here since the values are overwritten
# in the next step.
frame.index = index
return_frame.index = index
prior_day_panel = self.prior_day_panel.get_current()
prior_day_frame = prior_day_panel[field].copy()
if history_spec.ffill:
first_bar = prior_day_frame.ix[0]
nan_sids = first_bar[first_bar.isnull()]
for sid, _ in nan_sids.iterkv():
try:
if (
# Only use prior value if it is before the index,
# so that a backfill does not accidentally occur.
self.last_known_prior_values[field][sid]['dt'] <=
prior_day_frame.index[0]):
prior_day_frame[sid][0] =\
self.last_known_prior_values[field][sid]['value']
except KeyError:
# Allow case where there is no previous value.
# e.g. with leading nans.
pass
prior_day_frame = prior_day_frame.ffill()
frame.ix[:-1] = prior_day_frame.ix[:]
if bar_count > 1:
# Get the last bar_count - 1 frames from our stored historical
# frames.
digest_panel = self.digest_panels[history_spec.frequency]\
.get_current()
digest_frame = digest_panel[field].copy().ix[1 - bar_count:]
else:
digest_frame = None
# Copy the current day frame, since the fill behavior will mutate
# the values in the panel.
current_day_frame = self.current_day_panel[field][:algo_dt].copy()
if history_spec.ffill:
current_day_frame = ffill_day_frame(field,
current_day_frame,
prior_day_frame)
# Get minutes from our buffer panel to build the last row.
buffer_frame = self.buffer_panel_minutes(
earliest_minute=self.cur_window_starts[history_spec.frequency],
)[field].copy()
if do_ffill:
digest_frame = ffill_digest_frame_from_prior_values(
field,
digest_frame,
self.last_known_prior_values,
)
buffer_frame = ffill_buffer_from_prior_values(
field,
buffer_frame,
digest_frame,
self.last_known_prior_values,
)
if digest_frame is not None:
return_frame.ix[:-1] = digest_frame.ix[:]
if field == 'volume':
# This works for the day rollup, i.e. '1d',
# but '1m' will need to allow for 0 or nan minutes
frame.ix[algo_dt] = current_day_frame.sum()
return_frame.ix[algo_dt] = buffer_frame.fillna(0).sum()
elif field == 'high':
frame.ix[algo_dt] = current_day_frame.max()
return_frame.ix[algo_dt] = buffer_frame.max()
elif field == 'low':
frame.ix[algo_dt] = current_day_frame.min()
return_frame.ix[algo_dt] = buffer_frame.min()
elif field == 'open_price':
frame.ix[algo_dt] = current_day_frame.ix[0]
return_frame.ix[algo_dt] = buffer_frame.iloc[0]
else:
frame.ix[algo_dt] = current_day_frame.ix[algo_dt]
return_frame.ix[algo_dt] = buffer_frame.loc[algo_dt]
return frame
return return_frame
+5 -2
View File
@@ -157,8 +157,8 @@ class BarData(object):
usage of what this replaced as a dictionary subclass.
"""
def __init__(self):
self._data = {}
def __init__(self, data=None):
self._data = data or {}
self._contains_override = None
def __contains__(self, name):
@@ -217,3 +217,6 @@ class BarData(object):
def __len__(self):
return len(self.keys())
def __repr__(self):
return '{0}({1})'.format(self.__class__.__name__, self._data)
+10 -5
View File
@@ -32,8 +32,8 @@ class RollingPanel(object):
Restrictions: major_axis can only be a DatetimeIndex for now
"""
def __init__(self, window, items, sids, cap_multiple=2,
dtype=np.float64):
def __init__(self, window, items, sids, cap_multiple=2, dtype=np.float64):
self.pos = 0
self.window = window
@@ -49,9 +49,14 @@ class RollingPanel(object):
self.buffer = self._create_buffer()
def _create_buffer(self):
return pd.Panel(items=self.items, minor_axis=self.minor_axis,
major_axis=range(self.cap),
dtype=self.dtype)
panel = pd.Panel(
items=self.items,
minor_axis=self.minor_axis,
major_axis=range(self.cap),
dtype=self.dtype,
)
return panel
def _update_buffer(self, frame):
# Drop outdated, nan-filled minors (sids) and items (fields)