Files
catalyst/tests/test_transforms.py
T
2012-08-15 16:16:36 -04:00

315 lines
9.9 KiB
Python

import pytz
import numpy
from datetime import timedelta, datetime
from collections import defaultdict
from unittest2 import TestCase
from zipline import ndict
from zipline.lines import SimulatedTrading
from zipline.utils.test_utils import setup_logger, teardown_logger
from zipline.utils.date_utils import utcnow
from zipline.gens.tradegens import SpecificEquityTrades
from zipline.gens.transform import StatefulTransform, EventWindow
from zipline.gens.vwap import VWAP
from zipline.gens.mavg import MovingAverage
from zipline.gens.stddev import MovingStandardDev
from zipline.gens.returns import Returns
import zipline.utils.factory as factory
def to_dt(msg):
return ndict({'dt': msg})
class NoopEventWindow(EventWindow):
"""
A no-op EventWindow subclass for testing the base EventWindow logic.
Keeps lists of all added and dropped events.
"""
def __init__(self, market_aware, days, delta):
EventWindow.__init__(self, market_aware, days, delta)
self.added = []
self.removed = []
def handle_add(self, event):
self.added.append(event)
def handle_remove(self, event):
self.removed.append(event)
class EventWindowTestCase(TestCase):
def setUp(self):
setup_logger(self)
# Constants calling before open, during the day, and after
# close on a valid trading day.
self.pre_open = datetime(2012, 8, 7, 13, tzinfo = pytz.utc)
self.mid_day = datetime(2012, 8, 7, 15, tzinfo = pytz.utc)
self.post_close = datetime(2012, 8, 7, 22, tzinfo = pytz.utc)
# Constants calling before open, during the day, and after
# close on a saturday.
self.pre_open_saturday = datetime(2012, 8, 11, 13, tzinfo = pytz.utc)
self.mid_day_saturday = datetime(2012, 8, 11, 15, tzinfo = pytz.utc)
self.post_close_saturday = datetime(2012, 8, 11, 22, tzinfo = pytz.utc)
# Constants calling before open, during the day, and after
# close on a holiday.
self.pre_open_holiday = datetime(2012, 12, 25, 13, tzinfo = pytz.utc)
self.mid_day_holiday = datetime(2012, 12, 25, tzinfo = pytz.utc)
self.post_close_holiday = datetime(2012, 12, 25, 22, tzinfo = pytz.utc)
def test_event_window_with_timedelta(self):
# Keep all events within a 5 minute window.
window = NoopEventWindow(
market_aware = False,
delta = timedelta(minutes = 5),
days = None
)
now = utcnow()
# 15 dates, increasing in 1 minute increments.
dates = [now + i * timedelta(minutes = 1)
for i in xrange(15)]
# Turn the dates into the format required by EventWindow.
dt_messages = [to_dt(date) for date in dates]
# Run all messages through the window and assert that we're adding
# and removing messages appropriately. We start the enumeration at 1
# for convenience.
for num, message in enumerate(dt_messages, 1):
window.update(message)
# Assert that we've added the correct number of events.
assert len(window.added) == num
# Assert that we removed only events that fall outside (or
# on the boundary of) the delta.
for dropped in window.removed:
assert message.dt - dropped.dt >= timedelta(minutes = 5)
def test_market_aware_window(self):
window = NoopEventWindow(
market_aware = True,
delta = None,
days = 1
)
dates = ([self.pre_open]*3)
dates += ([self.mid_day]*3)
dates += ([self.post_close]*3)
dates += [self.pre_open + timedelta(days = 1, seconds = 1)]
events = [to_dt(date) for date in dates]
# Run the events.
for event in events:
window.update(event)
# We should have removed the pre_open events on the first day.
# The rest should be intact.
assert window.added == events
assert window.removed == events[0:3]
assert list(window.ticks) == events[3:]
def test_market_aware_window_weekend(self):
window = NoopEventWindow(
market_aware = True,
delta = None,
days = 2
)
dates = [self.pre_open_saturday - timedelta(days = 1, seconds=1)]
dates += [self.mid_day_saturday - timedelta(days = 1, seconds=1)]
dates += [self.post_close_saturday - timedelta(days = 1, seconds=1)]
dates += [self.mid_day_saturday + timedelta(days = 1)]
events = [to_dt(date) for date in dates]
# Run the events.
for event in events:
window.update(event)
# We shouldn't remove any events.
assert window.added == events
assert window.removed == []
assert list(window.ticks) == events
extra = to_dt(self.mid_day_saturday + timedelta(days = 2))
window.update(extra)
# We should remove only the first event.
assert window.removed == [events[0]]
assert list(window.ticks) == events[1:] + [extra]
def tearDown(self):
setup_logger(self)
class FinanceTransformsTestCase(TestCase):
def setUp(self):
self.trading_environment = factory.create_trading_environment()
setup_logger(self)
trade_history = factory.create_trade_history(
133,
[10.0, 10.0, 11.0, 11.0],
[100, 100, 100, 300],
timedelta(days=1),
self.trading_environment
)
self.source = SpecificEquityTrades(event_list=trade_history)
def tearDown(self):
self.log_handler.pop_application()
def test_vwap(self):
vwap = StatefulTransform(
VWAP,
market_aware = False,
delta = timedelta(days = 2)
)
transformed = list(vwap.transform(self.source))
# Output values
tnfm_vals = [message.tnfm_value for message in transformed]
# "Hand calculated" values.
expected = [
(10.0 * 100) / 100.0,
((10.0 * 100) + (10.0 * 100)) / (200.0),
# We should drop the first event here.
((10.0 * 100) + (11.0 * 100)) / (200.0),
# We should drop the second event here.
((11.0 * 100) + (11.0 * 300)) / (400.0)
]
# Output should match the expected.
assert tnfm_vals == expected
def test_returns(self):
# Daily returns.
returns = StatefulTransform(Returns, 1)
transformed = list(returns.transform(self.source))
tnfm_vals = [message.tnfm_value for message in transformed]
# No returns for the first event because we don't have a
# previous close.
expected = [0.0, 0.0, 0.1, 0.0]
assert tnfm_vals == expected
# Two-day returns. An extra kink here is that the
# factory will automatically skip a weekend for the
# last event. Results shouldn't notice this blip.
trade_history = factory.create_trade_history(
133,
[10.0, 15.0, 13.0, 12.0, 13.0],
[100, 100, 100, 300, 100],
timedelta(days=1),
self.trading_environment
)
self.source = SpecificEquityTrades(event_list=trade_history)
returns = StatefulTransform(Returns, 2)
transformed = list(returns.transform(self.source))
tnfm_vals = [message.tnfm_value for message in transformed]
expected = [
0.0,
0.0,
(13.0 - 10.0) / 10.0,
(12.0 - 15.0) / 15.0,
(13.0 - 13.0) / 13.0
]
assert tnfm_vals == expected
def test_moving_average(self):
mavg = StatefulTransform(
MovingAverage,
market_aware = False,
fields = ['price', 'volume'],
delta = timedelta(days = 2),
)
transformed = list(mavg.transform(self.source))
# Output values.
tnfm_prices = [message.tnfm_value.price for message in transformed]
tnfm_volumes = [message.tnfm_value.volume for message in transformed]
# "Hand-calculated" values
expected_prices = [
((10.0) / 1.0),
((10.0 + 10.0) / 2.0),
# First event should get dropped here.
((10.0 + 11.0) / 2.0),
# Second event should get dropped here.
((11.0 + 11.0) / 2.0)
]
expected_volumes = [
((100.0) / 1.0),
((100.0 + 100.0) / 2.0),
# First event should get dropped here.
((100.0 + 100.0) / 2.0),
# Second event should get dropped here.
((100.0 + 300.0) / 2.0)
]
assert tnfm_prices == expected_prices
assert tnfm_volumes == expected_volumes
def test_moving_stddev(self):
trade_history = factory.create_trade_history(
133,
[10.0, 15.0, 13.0, 12.0],
[100, 100, 100, 100],
timedelta(hours = 1),
self.trading_environment
)
stddev = StatefulTransform(
MovingStandardDev,
market_aware = False,
delta = timedelta(minutes = 150),
)
self.source = SpecificEquityTrades(event_list=trade_history)
transformed = list(stddev.transform(self.source))
vals = [message.tnfm_value for message in transformed]
expected = [
None,
numpy.std([10.0, 15.0], ddof = 1),
numpy.std([10.0, 15.0, 13.0], ddof = 1),
numpy.std([15.0, 13.0, 12.0], ddof = 1),
]
# numpy has odd rounding behavior, cf.
# http://docs.scipy.org/doc/numpy/reference/generated/numpy.std.html
for v1, v2 in zip(vals, expected):
if v1 == None:
assert v2 == None
continue
assert round(v1, 5) == round(v2, 5)