Merge branch 'history_port'

This commit is contained in:
twiecki
2014-04-10 16:05:46 -04:00
9 changed files with 1222 additions and 8 deletions
+20
View File
@@ -506,3 +506,23 @@ def handle_data(context, data):
**self.zipline_test_config)
output, _ = drain_zipline(self, zipline)
class TestHistory(TestCase):
def test_history(self):
history_algo = """
from zipline.api import history, add_history
def initialize(context):
add_history(10, '1d', 'price')
def handle_data(context, data):
df = history(10, '1d', 'price')
"""
start = pd.Timestamp('1991-01-01', tz='UTC')
end = pd.Timestamp('1991-01-15', tz='UTC')
source = RandomWalkSource(start=start,
end=end)
algo = TradingAlgorithm(script=history_algo, data_frequency='minute')
output = algo.run(source)
self.assertIsNot(output, None)
+721
View File
@@ -0,0 +1,721 @@
#
# Copyright 2014 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import TestCase
from nose_parameterized import parameterized
import numpy as np
import pandas as pd
from zipline.history import history
from zipline.history.history_container import HistoryContainer
from zipline.protocol import BarData
import zipline.utils.factory as factory
from zipline import TradingAlgorithm
from zipline.finance.trading import SimulationParameters
from zipline.sources import RandomWalkSource
# Cases are over the July 4th holiday, to ensure use of trading calendar.
# March 2013
# Su Mo Tu We Th Fr Sa
# 1 2
# 3 4 5 6 7 8 9
# 10 11 12 13 14 15 16
# 17 18 19 20 21 22 23
# 24 25 26 27 28 29 30
# 31
# April 2013
# Su Mo Tu We Th Fr Sa
# 1 2 3 4 5 6
# 7 8 9 10 11 12 13
# 14 15 16 17 18 19 20
# 21 22 23 24 25 26 27
# 28 29 30
#
# May 2013
# Su Mo Tu We Th Fr Sa
# 1 2 3 4
# 5 6 7 8 9 10 11
# 12 13 14 15 16 17 18
# 19 20 21 22 23 24 25
# 26 27 28 29 30 31
#
# June 2013
# Su Mo Tu We Th Fr Sa
# 1
# 2 3 4 5 6 7 8
# 9 10 11 12 13 14 15
# 16 17 18 19 20 21 22
# 23 24 25 26 27 28 29
# 30
# July 2013
# Su Mo Tu We Th Fr Sa
# 1 2 3 4 5 6
# 7 8 9 10 11 12 13
# 14 15 16 17 18 19 20
# 21 22 23 24 25 26 27
# 28 29 30 31
#
# Times to be converted via:
# pd.Timestamp('2013-07-05 9:31', tz='US/Eastern').tz_convert('UTC')},
MINUTE_CASES_RAW = {
'week of daily data': {
'input': {'bar_count': 5,
'frequency': '1d',
'algo_dt': '2013-07-05 9:31AM'},
'expected': [
'2013-06-28 4:00PM',
'2013-07-01 4:00PM',
'2013-07-02 4:00PM',
'2013-07-03 1:00PM',
'2013-07-05 9:31AM',
]
},
}
def to_timestamp(dt_str):
return pd.Timestamp(dt_str, tz='US/Eastern').tz_convert('UTC')
def convert_cases(cases):
"""
Convert raw strings to values comparable with system data.
"""
cases = cases.copy()
for case in cases.values():
case['input']['algo_dt'] = to_timestamp(case['input']['algo_dt'])
case['expected'] = pd.DatetimeIndex([to_timestamp(dt_str) for dt_str
in case['expected']])
return cases
MINUTE_CASES = convert_cases(MINUTE_CASES_RAW)
def index_at_dt(case_input):
history_spec = history.HistorySpec(
case_input['bar_count'],
case_input['frequency'],
None,
False
)
return history.index_at_dt(history_spec,
case_input['algo_dt'])
class TestHistoryIndex(TestCase):
@parameterized.expand(
[(name, case['input'], case['expected'])
for name, case in MINUTE_CASES.items()]
)
def test_index_at_dt(self, name, case_input, expected):
history_index = index_at_dt(case_input)
history_series = pd.Series(index=history_index)
expected_series = pd.Series(index=expected)
pd.util.testing.assert_series_equal(history_series, expected_series)
class TestHistoryContainer(TestCase):
def test_container_nans_and_daily_roll(self):
# set up trading environment
factory.create_simulation_parameters(num_days=4)
spec = history.HistorySpec(
bar_count=3,
frequency='1d',
field='price',
ffill=True
)
specs = {hash(spec): spec}
initial_sids = [1, ]
initial_dt = pd.Timestamp(
'2013-06-28 9:31AM', tz='US/Eastern').tz_convert('UTC')
container = HistoryContainer(
specs, initial_sids, initial_dt)
bar_data = BarData()
# Since there was no backfill because of no db.
# And no first bar of data, so all values should be nans.
prices = container.get_history(spec, initial_dt)
nan_values = np.isnan(prices[1])
self.assertTrue(all(nan_values), nan_values)
# Add data on bar two of first day.
second_bar_dt = pd.Timestamp(
'2013-06-28 9:32AM', tz='US/Eastern').tz_convert('UTC')
bar_data[1] = {
'price': 10,
'dt': second_bar_dt
}
container.update(bar_data, second_bar_dt)
prices = container.get_history(spec, second_bar_dt)
# Prices should be
# 1
# 2013-06-26 20:00:00+00:00 NaN
# 2013-06-27 20:00:00+00:00 NaN
# 2013-06-28 13:32:00+00:00 10
self.assertTrue(np.isnan(prices[1].ix[0]))
self.assertTrue(np.isnan(prices[1].ix[1]))
self.assertEqual(prices[1].ix[2], 10)
third_bar_dt = pd.Timestamp(
'2013-06-28 9:33AM', tz='US/Eastern').tz_convert('UTC')
del bar_data[1]
container.update(bar_data, third_bar_dt)
prices = container.get_history(spec, third_bar_dt)
# The one should be forward filled
# Prices should be
# 1
# 2013-06-26 20:00:00+00:00 NaN
# 2013-06-27 20:00:00+00:00 NaN
# 2013-06-28 13:33:00+00:00 10
self.assertEquals(prices[1][third_bar_dt], 10)
# Note that we did not fill in data at the close.
# There was a bug where a nan was being introduced because of the
# last value of 'raw' data was used, instead of a ffilled close price.
day_two_first_bar_dt = pd.Timestamp(
'2013-07-01 9:31AM', tz='US/Eastern').tz_convert('UTC')
bar_data[1] = {
'price': 20,
'dt': day_two_first_bar_dt
}
container.update(bar_data, day_two_first_bar_dt)
prices = container.get_history(spec, day_two_first_bar_dt)
# Prices Should Be
# 1
# 2013-06-27 20:00:00+00:00 nan
# 2013-06-28 20:00:00+00:00 10
# 2013-07-01 13:31:00+00:00 20
self.assertTrue(np.isnan(prices[1].ix[0]))
self.assertEqual(prices[1].ix[1], 10)
self.assertEqual(prices[1].ix[2], 20)
# Clear out the bar data
del bar_data[1]
day_three_first_bar_dt = pd.Timestamp(
'2013-07-02 9:31AM', tz='US/Eastern').tz_convert('UTC')
container.update(bar_data, day_three_first_bar_dt)
prices = container.get_history(spec, day_three_first_bar_dt)
# 1
# 2013-06-28 20:00:00+00:00 10
# 2013-07-01 20:00:00+00:00 20
# 2013-07-02 13:31:00+00:00 20
self.assertTrue(prices[1].ix[0], 10)
self.assertTrue(prices[1].ix[1], 20)
self.assertTrue(prices[1].ix[2], 20)
day_four_first_bar_dt = pd.Timestamp(
'2013-07-03 9:31AM', tz='US/Eastern').tz_convert('UTC')
container.update(bar_data, day_four_first_bar_dt)
prices = container.get_history(spec, day_four_first_bar_dt)
# 1
# 2013-07-01 20:00:00+00:00 20
# 2013-07-02 20:00:00+00:00 20
# 2013-07-03 13:31:00+00:00 20
self.assertEqual(prices[1].ix[0], 20)
self.assertEqual(prices[1].ix[1], 20)
self.assertEqual(prices[1].ix[2], 20)
class TestHistoryAlgo(TestCase):
def setUp(self):
np.random.seed(123)
def test_basic_history(self):
algo_text = """
from zipline.api import history, add_history
def initialize(context):
add_history(bar_count=2, frequency='1d', field='price')
def handle_data(context, data):
prices = history(bar_count=2, frequency='1d', field='price')
context.last_prices = prices
""".strip()
# March 2006
# Su Mo Tu We Th Fr Sa
# 1 2 3 4
# 5 6 7 8 9 10 11
# 12 13 14 15 16 17 18
# 19 20 21 22 23 24 25
# 26 27 28 29 30 31
start = pd.Timestamp('2006-03-20', tz='UTC')
end = pd.Timestamp('2006-03-21', tz='UTC')
sim_params = factory.create_simulation_parameters(
start=start, end=end)
test_algo = TradingAlgorithm(
script=algo_text,
data_frequency='minute',
sim_params=sim_params
)
source = RandomWalkSource(start=start,
end=end)
output = test_algo.run(source)
self.assertIsNotNone(output)
last_prices = test_algo.last_prices[0]
oldest_dt = pd.Timestamp(
'2006-03-20 4:00 PM', tz='US/Eastern').tz_convert('UTC')
newest_dt = pd.Timestamp(
'2006-03-21 4:00 PM', tz='US/Eastern').tz_convert('UTC')
self.assertEquals(oldest_dt, last_prices.index[0])
self.assertEquals(newest_dt, last_prices.index[-1])
# Random, depends on seed
self.assertEquals(139.36946942498648, last_prices[oldest_dt])
self.assertEquals(180.15661995395106, last_prices[newest_dt])
def test_basic_history_one_day(self):
algo_text = """
from zipline.api import history, add_history
def initialize(context):
add_history(bar_count=1, frequency='1d', field='price')
def handle_data(context, data):
prices = history(bar_count=1, frequency='1d', field='price')
context.last_prices = prices
""".strip()
# March 2006
# Su Mo Tu We Th Fr Sa
# 1 2 3 4
# 5 6 7 8 9 10 11
# 12 13 14 15 16 17 18
# 19 20 21 22 23 24 25
# 26 27 28 29 30 31
start = pd.Timestamp('2006-03-20', tz='UTC')
end = pd.Timestamp('2006-03-21', tz='UTC')
sim_params = factory.create_simulation_parameters(
start=start, end=end)
test_algo = TradingAlgorithm(
script=algo_text,
data_frequency='minute',
sim_params=sim_params
)
source = RandomWalkSource(start=start,
end=end)
output = test_algo.run(source)
self.assertIsNotNone(output)
last_prices = test_algo.last_prices[0]
# oldest and newest should be the same if there is only 1 bar
oldest_dt = pd.Timestamp(
'2006-03-21 4:00 PM', tz='US/Eastern').tz_convert('UTC')
newest_dt = pd.Timestamp(
'2006-03-21 4:00 PM', tz='US/Eastern').tz_convert('UTC')
self.assertEquals(oldest_dt, last_prices.index[0])
self.assertEquals(newest_dt, last_prices.index[-1])
# Random, depends on seed
self.assertEquals(180.15661995395106, last_prices[oldest_dt])
self.assertEquals(180.15661995395106, last_prices[newest_dt])
def test_basic_history_positional_args(self):
"""
Ensure that positional args work.
"""
algo_text = """
import copy
from zipline.api import history, add_history
def initialize(context):
add_history(2, '1d', 'price')
def handle_data(context, data):
prices = history(2, '1d', 'price')
context.last_prices = copy.deepcopy(prices)
""".strip()
# March 2006
# Su Mo Tu We Th Fr Sa
# 1 2 3 4
# 5 6 7 8 9 10 11
# 12 13 14 15 16 17 18
# 19 20 21 22 23 24 25
# 26 27 28 29 30 31
start = pd.Timestamp('2006-03-20', tz='UTC')
end = pd.Timestamp('2006-03-21', tz='UTC')
sim_params = factory.create_simulation_parameters(
start=start, end=end)
test_algo = TradingAlgorithm(
script=algo_text,
data_frequency='minute',
sim_params=sim_params
)
source = RandomWalkSource(start=start,
end=end)
output = test_algo.run(source)
self.assertIsNotNone(output)
last_prices = test_algo.last_prices[0]
oldest_dt = pd.Timestamp(
'2006-03-20 4:00 PM', tz='US/Eastern').tz_convert('UTC')
newest_dt = pd.Timestamp(
'2006-03-21 4:00 PM', tz='US/Eastern').tz_convert('UTC')
self.assertEquals(oldest_dt, last_prices.index[0])
self.assertEquals(newest_dt, last_prices.index[-1])
self.assertEquals(139.36946942498648, last_prices[oldest_dt])
self.assertEquals(180.15661995395106, last_prices[newest_dt])
def test_history_with_volume(self):
algo_text = """
from zipline.api import history, add_history, record
def initialize(context):
add_history(3, '1d', 'volume')
def handle_data(context, data):
volume = history(3, '1d', 'volume')
record(current_volume=volume[0].ix[-1])
""".strip()
# April 2007
# Su Mo Tu We Th Fr Sa
# 1 2 3 4 5 6 7
# 8 9 10 11 12 13 14
# 15 16 17 18 19 20 21
# 22 23 24 25 26 27 28
# 29 30
start = pd.Timestamp('2007-04-10', tz='UTC')
end = pd.Timestamp('2007-04-10', tz='UTC')
sim_params = SimulationParameters(
period_start=start,
period_end=end,
capital_base=float("1.0e5"),
data_frequency='minute',
emission_rate='minute'
)
test_algo = TradingAlgorithm(
script=algo_text,
data_frequency='minute',
sim_params=sim_params
)
source = RandomWalkSource(start=start,
end=end)
output = test_algo.run(source)
np.testing.assert_equal(output.ix[0, 'current_volume'],
212218404.0)
def test_history_with_high(self):
algo_text = """
from zipline.api import history, add_history, record
def initialize(context):
add_history(3, '1d', 'high')
def handle_data(context, data):
highs = history(3, '1d', 'high')
record(current_high=highs[0].ix[-1])
""".strip()
# April 2007
# Su Mo Tu We Th Fr Sa
# 1 2 3 4 5 6 7
# 8 9 10 11 12 13 14
# 15 16 17 18 19 20 21
# 22 23 24 25 26 27 28
# 29 30
start = pd.Timestamp('2007-04-10', tz='UTC')
end = pd.Timestamp('2007-04-10', tz='UTC')
sim_params = SimulationParameters(
period_start=start,
period_end=end,
capital_base=float("1.0e5"),
data_frequency='minute',
emission_rate='minute'
)
test_algo = TradingAlgorithm(
script=algo_text,
data_frequency='minute',
sim_params=sim_params
)
source = RandomWalkSource(start=start,
end=end)
output = test_algo.run(source)
np.testing.assert_equal(output.ix[0, 'current_high'],
139.5370641791925)
def test_history_with_low(self):
algo_text = """
from zipline.api import history, add_history, record
def initialize(context):
add_history(3, '1d', 'low')
def handle_data(context, data):
lows = history(3, '1d', 'low')
record(current_low=lows[0].ix[-1])
""".strip()
# April 2007
# Su Mo Tu We Th Fr Sa
# 1 2 3 4 5 6 7
# 8 9 10 11 12 13 14
# 15 16 17 18 19 20 21
# 22 23 24 25 26 27 28
# 29 30
start = pd.Timestamp('2007-04-10', tz='UTC')
end = pd.Timestamp('2007-04-10', tz='UTC')
sim_params = SimulationParameters(
period_start=start,
period_end=end,
capital_base=float("1.0e5"),
data_frequency='minute',
emission_rate='minute'
)
test_algo = TradingAlgorithm(
script=algo_text,
data_frequency='minute',
sim_params=sim_params
)
source = RandomWalkSource(start=start,
end=end)
output = test_algo.run(source)
np.testing.assert_equal(output.ix[0, 'current_low'],
99.891436939669944)
def test_history_with_open(self):
algo_text = """
from zipline.api import history, add_history, record
def initialize(context):
add_history(3, '1d', 'open_price')
def handle_data(context, data):
opens = history(3, '1d', 'open_price')
record(current_open=opens[0].ix[-1])
""".strip()
# April 2007
# Su Mo Tu We Th Fr Sa
# 1 2 3 4 5 6 7
# 8 9 10 11 12 13 14
# 15 16 17 18 19 20 21
# 22 23 24 25 26 27 28
# 29 30
start = pd.Timestamp('2007-04-10', tz='UTC')
end = pd.Timestamp('2007-04-10', tz='UTC')
sim_params = SimulationParameters(
period_start=start,
period_end=end,
capital_base=float("1.0e5"),
data_frequency='minute',
emission_rate='minute'
)
test_algo = TradingAlgorithm(
script=algo_text,
data_frequency='minute',
sim_params=sim_params
)
source = RandomWalkSource(start=start,
end=end)
output = test_algo.run(source)
np.testing.assert_equal(output.ix[0, 'current_open'],
99.991436939669939)
def test_history_passed_to_func(self):
"""
Had an issue where MagicMock was causing errors during validation
with rolling mean.
"""
algo_text = """
from zipline.api import history, add_history
import pandas as pd
def initialize(context):
add_history(2, '1d', 'price')
def handle_data(context, data):
prices = history(2, '1d', 'price')
pd.rolling_mean(prices, 2)
""".strip()
# April 2007
# Su Mo Tu We Th Fr Sa
# 1 2 3 4 5 6 7
# 8 9 10 11 12 13 14
# 15 16 17 18 19 20 21
# 22 23 24 25 26 27 28
# 29 30
start = pd.Timestamp('2007-04-10', tz='UTC')
end = pd.Timestamp('2007-04-10', tz='UTC')
sim_params = SimulationParameters(
period_start=start,
period_end=end,
capital_base=float("1.0e5"),
data_frequency='minute',
emission_rate='minute'
)
test_algo = TradingAlgorithm(
script=algo_text,
data_frequency='minute',
sim_params=sim_params
)
source = RandomWalkSource(start=start,
end=end)
output = test_algo.run(source)
# At this point, just ensure that there is no crash.
self.assertIsNotNone(output)
def test_history_passed_to_talib(self):
"""
Had an issue where MagicMock was causing errors during validation
with talib.
We don't officially support a talib integration, yet.
But using talib directly should work.
"""
algo_text = """
import talib
import numpy as np
from zipline.api import history, add_history, record
def initialize(context):
add_history(2, '1d', 'price')
def handle_data(context, data):
prices = history(2, '1d', 'price')
ma_result = talib.MA(np.asarray(prices[0]), timeperiod=2)
record(ma=ma_result[-1])
""".strip()
# April 2007
# Su Mo Tu We Th Fr Sa
# 1 2 3 4 5 6 7
# 8 9 10 11 12 13 14
# 15 16 17 18 19 20 21
# 22 23 24 25 26 27 28
# 29 30
# Eddie: this was set to 04-10 but I don't see how that makes
# sense as it does not generate enough data to get at -2 index
# below.
start = pd.Timestamp('2007-04-05', tz='UTC')
end = pd.Timestamp('2007-04-10', tz='UTC')
sim_params = SimulationParameters(
period_start=start,
period_end=end,
capital_base=float("1.0e5"),
data_frequency='minute',
emission_rate='daily'
)
test_algo = TradingAlgorithm(
script=algo_text,
data_frequency='minute',
sim_params=sim_params
)
source = RandomWalkSource(start=start,
end=end)
output = test_algo.run(source)
# At this point, just ensure that there is no crash.
self.assertIsNotNone(output)
recorded_ma = output.ix[-2, 'ma']
self.assertFalse(pd.isnull(recorded_ma))
# Depends on seed
np.testing.assert_almost_equal(recorded_ma,
159.76304468946876)
+38 -5
View File
@@ -54,6 +54,9 @@ from zipline.gens.composites import (
)
from zipline.gens.tradesimulation import AlgorithmSimulator
from zipline.history import HistorySpec
from zipline.history.history_container import HistoryContainer
DEFAULT_CAPITAL_BASE = float("1.0e5")
@@ -155,6 +158,9 @@ class TradingAlgorithm(object):
self.portfolio_needs_update = True
self._portfolio = None
self.history_container = None
self.history_specs = {}
# If string is passed in, execute and get reference to
# functions.
self.algoscript = kwargs.pop('script', None)
@@ -186,7 +192,6 @@ class TradingAlgorithm(object):
# an algorithm subclass needs to set initialized to True when
# it is fully initialized.
self.initialized = False
self.initialize(*args, **kwargs)
def initialize(self, *args, **kwargs):
@@ -198,6 +203,9 @@ class TradingAlgorithm(object):
set_algo_instance(None)
def handle_data(self, data):
if self.history_container:
self.history_container.update(data, self.datetime)
self._handle_data(self, data)
def __repr__(self):
@@ -350,19 +358,31 @@ class TradingAlgorithm(object):
# use the default params set with the algorithm.
# Else, we create simulation parameters using the start and end of the
# source provided.
if not sim_params:
if not self.sim_params:
if sim_params is None:
if self.sim_params is None:
start = source.start
end = source.end
sim_params = create_simulation_parameters(
start=start,
end=end,
capital_base=self.capital_base
capital_base=self.capital_base,
)
else:
sim_params = self.sim_params
# update sim params to ensure it's set
self.sim_params = sim_params
if self.sim_params.sids is None:
all_sids = [sid for s in self.sources for sid in s.sids]
self.sim_params.sids = set(all_sids)
# Create history containers
if len(self.history_specs) != 0:
self.history_container = HistoryContainer(
self.history_specs,
self.sim_params.sids,
self.sim_params.first_open)
# Create transforms by wrapping them into StatefulTransforms
self.transforms = []
for namestring, trans_descr in iteritems(self.registered_transforms):
@@ -667,3 +687,16 @@ class TradingAlgorithm(object):
"""
return self.blotter.open_orders
@api_method
def add_history(self, bar_count, frequency, field,
ffill=True):
history_spec = HistorySpec(bar_count, frequency, field, ffill)
self.history_specs[history_spec.key_str] = history_spec
@api_method
def history(self, bar_count, frequency, field, ffill=True):
spec_key_str = HistorySpec.spec_key(
bar_count, frequency, field, ffill)
history_spec = self.history_specs[spec_key_str]
return self.history_container.get_history(history_spec, self.datetime)
+3 -1
View File
@@ -225,7 +225,8 @@ class SimulationParameters(object):
def __init__(self, period_start, period_end,
capital_base=10e3,
emission_rate='daily',
data_frequency='daily'):
data_frequency='daily',
sids=None):
global environment
if not environment:
# This is the global environment for trading simulation.
@@ -237,6 +238,7 @@ class SimulationParameters(object):
self.emission_rate = emission_rate
self.data_frequency = data_frequency
self.sids = sids
assert self.period_start <= self.period_end, \
"Period start falls after period end."
+29
View File
@@ -0,0 +1,29 @@
#
# Copyright 2014 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from . history import (
HistorySpec,
days_index_at_dt,
index_at_dt
)
from . import history_container
__all__ = [
'HistorySpec',
'days_index_at_dt',
'index_at_dt',
'history_container'
]
+135
View File
@@ -0,0 +1,135 @@
#
# Copyright 2014 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
import numpy as np
import re
from zipline.finance import trading
def parse_freq_str(freq_str):
# TODO: Wish we were more aligned with pandas here.
num_str, unit_str = re.match('([0-9]+)([A-Za-z]+)', freq_str).groups()
return int(num_str), unit_str
class Frequency(object):
"""
Represents how the data is sampled, as specified by the algoscript
via units like "1d", "1m", etc.
Currently only one frequency is supported, "1d"
"1d" provides data keyed by closing, and the last minute of the current
day.
"""
def __init__(self, freq_str):
# The string the at the algoscript specifies.
# Hold onto to use a key for caching.
self.freq_str = freq_str
# num - The number of units of the frequency.
# unit_str - The unit type, e.g. 'd'
self.num, self.unit_str = parse_freq_str(freq_str)
class HistorySpec(object):
"""
Maps to the parameters of the history() call made by the algoscript
An object is used here so that get_history calls are not constantly
parsing the parameters and provides values for caching and indexing into
result frames.
"""
@classmethod
def spec_key(cls, bar_count, freq_str, field, ffill):
"""
Used as a hash/key value for the HistorySpec.
"""
return "{0}:{1}:{2}:{3}".format(
bar_count, freq_str, field, ffill)
def __init__(self, bar_count, frequency, field, ffill):
# Number of bars to look back.
self.bar_count = bar_count
if isinstance(frequency, str):
frequency = Frequency(frequency)
# The frequency at which the data is sampled.
self.frequency = frequency
# The field, e.g. 'price', 'volume', etc.
self.field = field
# Whether or not to forward fill the nan data.
self.ffill = ffill
# How many trading days the spec needs to look back.
# Used by index creation to see how large of an overarching window
# is needed.
self.days_needed = calculate_days_needed(
self.bar_count, self.frequency)
# Calculate the cache key string once.
self.key_str = self.spec_key(
bar_count, frequency.freq_str, field, ffill)
def calculate_days_needed(bar_count, freq):
""" Returns number trading days needed.
Overshoots so that we more than enough to sample from the current
frequency slot plus previous ones.
"""
if freq.unit_str == 'd':
return bar_count * freq.num
def days_index_at_dt(days_needed, algo_dt):
"""
The timestamps of previous days closes with the size of @days_needed
at @algo_dt.
"""
env = trading.environment
latest_algo_dt = algo_dt
current_index = env.open_and_closes.index.searchsorted(algo_dt.date())
previous_days_num = days_needed - 1
previous_days = env.open_and_closes['market_close'][
current_index - previous_days_num:current_index]
# Using the 'rawer' numpy array values here because of a bottleneck
# that appeared when using DatetimeIndex
return np.append(previous_days.values, latest_algo_dt)
def index_at_dt(history_spec, algo_dt):
"""
The index, including @algo_dt at the given @algo_dt for the count
and frequency of the @history_spec.
"""
days_index = days_index_at_dt(history_spec.days_needed, algo_dt)
frequency = history_spec.frequency
if frequency.unit_str == 'd':
index_of_algo_dt = days_index.searchsorted(algo_dt)
start_index = index_of_algo_dt + 1 - history_spec.bar_count
end_index = index_of_algo_dt + 1
return days_index[start_index:end_index]
+271
View File
@@ -0,0 +1,271 @@
#
# Copyright 2014 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import pandas as pd
from six import itervalues
from . history import (
index_at_dt,
days_index_at_dt,
)
from zipline.finance import trading
from zipline.utils.data import RollingPanel
# The closing price is referred to be multiple names,
# allow both for price rollover logic etc.
CLOSING_PRICE_FIELDS = {'price', 'close_price'}
def create_initial_day_panel(days_needed, fields, sids, dt):
index = days_index_at_dt(days_needed, dt)
# Use original index in case of 1 bar.
if days_needed != 1:
index = index[:-1]
window = len(index)
rp = RollingPanel(window, fields, sids)
for i, day in enumerate(index):
rp.index_buf[i] = day
rp.pos = window
return rp
def create_current_day_panel(fields, sids, dt):
# Can't use open_and_close since need to create enough space for a full
# day, even on a half day.
# Can now use mkt open and close, since we don't roll
env = trading.environment
index = env.market_minutes_for_day(dt)
return pd.Panel(items=fields, minor_axis=sids, major_axis=index)
def ffill_day_frame(field, day_frame, prior_day_frame):
# get values which are nan-at the beginning of the day
# and attempt to fill with the last close
first_bar = day_frame.ix[0]
nan_sids = first_bar[np.isnan(first_bar)]
for sid, _ in nan_sids.iterkv():
day_frame[sid][0] = prior_day_frame.ix[-1, sid]
if field != 'volume':
day_frame = day_frame.ffill()
return day_frame
class HistoryContainer(object):
"""
Container for all history panels and frames used by an algoscript.
To be used internally by algoproxy, but *not* passed directly to the
algorithm.
Entry point for the algoscript is the result of `get_history`.
"""
def __init__(self, history_specs, initial_sids, initial_dt):
# All of the history specs found by the algoscript parsing.
self.history_specs = history_specs
# The overaching panel needs to be large enough to contain the
# largest history spec
self.max_days_needed = max(spec.days_needed for spec
in itervalues(history_specs))
# The set of fields specified by all history specs
self.fields = set(spec.field for spec in itervalues(history_specs))
self.prior_day_panel = create_initial_day_panel(
self.max_days_needed, self.fields, initial_sids, initial_dt)
# This panel contains the minutes for the current day.
# The value that is used is some sort of aggregation call on the
# panel, e.g. `sum` for volume, `max` for high, etc.
self.current_day_panel = create_current_day_panel(
self.fields, initial_sids, initial_dt)
# Helps prop up the prior day panel against having a nan, when
# the data has been seen.
self.last_known_prior_values = {field: {} for field in self.fields}
# Populating initial frames here, so that the cost of creating the
# initial frames does not show up when profiling get_y
# These frames are cached since mid-stream creation of containing
# data frames on every bar is expensive.
self.return_frames = {}
self.create_return_frames(initial_dt)
def create_return_frames(self, algo_dt):
"""
Populates the return frame cache.
Called during init and at universe rollovers.
"""
for history_spec in itervalues(self.history_specs):
index = index_at_dt(history_spec, algo_dt)
index = pd.to_datetime(index)
frame = pd.DataFrame(
index=index,
columns=map(int, self.current_day_panel.minor_axis.values),
dtype=np.float64)
self.return_frames[history_spec] = frame
def update(self, data, algo_dt):
"""
Takes the bar at @algo_dt's @data and adds to the current day panel.
"""
self.check_and_roll(algo_dt)
fields = self.fields
field_data = {sid: {field: bar[field] for field in fields}
for sid, bar in data.iteritems()
if (bar
and
bar['dt'] == algo_dt
and
# Only use data which is keyed in the data panel.
# Prevents crashes due to custom data.
sid in self.current_day_panel.minor_axis)}
field_frame = pd.DataFrame(field_data)
self.current_day_panel.ix[:, algo_dt, :] = field_frame.T
def roll(self, roll_dt):
env = trading.environment
# This should work for price, but not others, e.g.
# open.
# Get the most recent value.
rolled = pd.DataFrame(
index=self.current_day_panel.items,
columns=self.current_day_panel.minor_axis)
for field in self.fields:
if field in CLOSING_PRICE_FIELDS:
# Use the last price.
prices = self.current_day_panel.ffill().ix[field, -1, :]
rolled.ix[field] = prices
elif field == 'open_price':
# Use the first price.
opens = self.current_day_panel.ix['open_price', 0, :]
rolled.ix['open_price'] = opens
elif field == 'volume':
# Volume is the sum of the volumes during the
# course of the day
volumes = self.current_day_panel.ix['volume'].apply(np.sum)
rolled.ix['volume'] = volumes
elif field == 'high':
# Use the highest high.
highs = self.current_day_panel.ix['high'].apply(np.max)
rolled.ix['high'] = highs
elif field == 'low':
# Use the lowest low.
lows = self.current_day_panel.ix['low'].apply(np.min)
rolled.ix['low'] = lows
for sid, value in rolled.ix[field].iterkv():
if not np.isnan(value):
try:
prior_values = self.last_known_prior_values[field][sid]
except KeyError:
prior_values = {}
self.last_known_prior_values[field][sid] = prior_values
prior_values['dt'] = roll_dt
prior_values['value'] = value
self.prior_day_panel.add_frame(roll_dt, rolled)
# Create a new 'current day' collector.
next_day = env.next_trading_day(roll_dt)
if next_day:
# Only create the next panel if there is a next day.
# i.e. don't create the next panel on the last day of
# the backest/current day of live trading.
self.current_day_panel = create_current_day_panel(
self.fields,
# Will break on quarter rollover.
self.current_day_panel.minor_axis,
next_day)
def check_and_roll(self, algo_dt):
"""
Check whether the algo_dt is at the end of a day.
If it is, aggregate the day's minute data and store it in the prior
day panel.
"""
# Use a while loop to account for illiquid bars.
while algo_dt > self.current_day_panel.major_axis[-1]:
roll_dt = self.current_day_panel.major_axis[-1]
self.roll(roll_dt)
def get_history(self, history_spec, algo_dt):
"""
Main API used by the algoscript is mapped to this function.
Selects from the overarching history panel the values for the
@history_spec at the given @algo_dt.
"""
field = history_spec.field
index = index_at_dt(history_spec, algo_dt)
index = pd.to_datetime(index)
frame = self.return_frames[history_spec]
# Overwrite the index.
# Not worrying about values here since the values are overwritten
# in the next step.
frame.index = index
prior_day_panel = self.prior_day_panel.get_current()
prior_day_frame = prior_day_panel[field].copy()
if history_spec.ffill:
first_bar = prior_day_frame.ix[0]
nan_sids = first_bar[first_bar.isnull()]
for sid, _ in nan_sids.iterkv():
try:
if (
# Only use prior value if it is before the index,
# so that a backfill does not accidentally occur.
self.last_known_prior_values[field][sid]['dt'] <=
prior_day_frame.index[0]):
prior_day_frame[sid][0] =\
self.last_known_prior_values[field][sid]['value']
except KeyError:
# Allow case where there is no previous value.
# e.g. with leading nans.
pass
prior_day_frame = prior_day_frame.ffill()
frame.ix[:-1] = prior_day_frame.ix[:]
# Copy the current day frame, since the fill behavior will mutate
# the values in the panel.
current_day_frame = self.current_day_panel[field][:algo_dt].copy()
if history_spec.ffill:
current_day_frame = ffill_day_frame(field,
current_day_frame,
prior_day_frame)
if field == 'volume':
# This works for the day rollup, i.e. '1d',
# but '1m' will need to allow for 0 or nan minutes
frame.ix[algo_dt] = current_day_frame.sum()
elif field == 'high':
frame.ix[algo_dt] = current_day_frame.max()
elif field == 'low':
frame.ix[algo_dt] = current_day_frame.min()
elif field == 'open_price':
frame.ix[algo_dt] = current_day_frame.ix[0]
else:
frame.ix[algo_dt] = current_day_frame.ix[algo_dt]
return frame
+2
View File
@@ -81,6 +81,8 @@ class RandomWalkSource(DataSource):
self.drift = .1
self.sd = .1
self.sids = self.start_prices.keys()
self.open_and_closes = \
calendar.open_and_closes[self.start:self.end]
+3 -2
View File
@@ -42,8 +42,8 @@ __all__ = ['load_from_yahoo', 'load_bars_from_yahoo']
def create_simulation_parameters(year=2006, start=None, end=None,
capital_base=float("1.0e5"),
num_days=None, load=None
):
num_days=None, load=None,
sids=None):
"""Construct a complete environment with reasonable defaults"""
if start is None:
start = datetime(year, 1, 1, tzinfo=pytz.utc)
@@ -59,6 +59,7 @@ def create_simulation_parameters(year=2006, start=None, end=None,
period_start=start,
period_end=end,
capital_base=capital_base,
sids=sids,
)
return sim_params