Files
catalyst/tests/test_perf_tracking.py
T
Eddie Hebert 2debde31ba BLD/STY: Upgrade to latest versions of lint checkers.
Upgrade pep8 1.4.6 -> 1.5.7
Upgrade pyflakes 0.7.3 -> 0.8.1

Also, tweak some line indentations which now show up as errors,
because of the fixes/changes to visual indent detection between
pep8 versions.
2014-05-30 12:44:10 -04:00

1397 lines
46 KiB
Python

#
# Copyright 2013 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
import collections
import logging
import operator
import unittest
from nose_parameterized import parameterized
import datetime
import pytz
import itertools
from six.moves import range, zip
import zipline.utils.factory as factory
import zipline.finance.performance as perf
from zipline.finance.slippage import Transaction, create_transaction
import zipline.utils.math_utils as zp_math
from zipline.gens.composites import date_sorted_sources
from zipline.finance.trading import SimulationParameters
from zipline.finance.blotter import Order
from zipline.finance.commission import PerShare, PerTrade, PerDollar
from zipline.finance import trading
from zipline.protocol import DATASOURCE_TYPE
from zipline.utils.factory import create_random_simulation_parameters
import zipline.protocol
from zipline.protocol import Event
logger = logging.getLogger('Test Perf Tracking')
onesec = datetime.timedelta(seconds=1)
oneday = datetime.timedelta(days=1)
tradingday = datetime.timedelta(hours=6, minutes=30)
def create_txn(event, price, amount):
mock_order = Order(None, None, event.sid, id=None)
txn = create_transaction(event, mock_order, price, amount)
txn.source_id = 'MockTransactionSource'
return txn
def benchmark_events_in_range(sim_params):
return [
Event({'dt': dt,
'returns': ret,
'type':
zipline.protocol.DATASOURCE_TYPE.BENCHMARK,
'source_id': 'benchmarks'})
for dt, ret in trading.environment.benchmark_returns.iterkv()
if dt.date() >= sim_params.period_start.date()
and dt.date() <= sim_params.period_end.date()
]
def calculate_results(host, events):
perf_tracker = perf.PerformanceTracker(host.sim_params)
events = sorted(events, key=lambda ev: ev.dt)
all_events = date_sorted_sources(events, host.benchmark_events)
filtered_events = (filt_event for filt_event in all_events
if filt_event.dt <= events[-1].dt)
grouped_events = itertools.groupby(filtered_events, lambda x: x.dt)
results = []
bm_updated = False
for date, group in grouped_events:
for event in group:
perf_tracker.process_event(event)
if event.type == DATASOURCE_TYPE.BENCHMARK:
bm_updated = True
if bm_updated:
msg = perf_tracker.handle_market_close()
results.append(msg)
bm_updated = False
return results
class TestSplitPerformance(unittest.TestCase):
def setUp(self):
self.sim_params, self.dt, self.end_dt = \
create_random_simulation_parameters()
# start with $10,000
self.sim_params.capital_base = 10e3
self.benchmark_events = benchmark_events_in_range(self.sim_params)
def test_split_long_position(self):
with trading.TradingEnvironment() as env:
events = factory.create_trade_history(
1,
[20, 20],
[100, 100],
oneday,
self.sim_params
)
# set up a long position in sid 1
# 100 shares at $20 apiece = $2000 position
events.insert(0, create_txn(events[0], 20, 100))
# set up a split with ratio 3
events.append(factory.create_split(1, 3,
env.next_trading_day(events[1].dt)))
results = calculate_results(self, events)
# should have 33 shares (at $60 apiece) and $20 in cash
self.assertEqual(2, len(results))
latest_positions = results[1]['daily_perf']['positions']
self.assertEqual(1, len(latest_positions))
# check the last position to make sure it's been updated
position = latest_positions[0]
self.assertEqual(1, position['sid'])
self.assertEqual(33, position['amount'])
self.assertEqual(60, position['cost_basis'])
self.assertEqual(60, position['last_sale_price'])
# since we started with $10000, and we spent $2000 on the
# position, but then got $20 back, we should have $8020
# (or close to it) in cash.
# we won't get exactly 8020 because sometimes a split is
# denoted as a ratio like 0.3333, and we lose some digits
# of precision. thus, make sure we're pretty close.
daily_perf = results[1]['daily_perf']
self.assertTrue(
zp_math.tolerant_equals(8020,
daily_perf['ending_cash'], 1))
for i, result in enumerate(results):
for perf_kind in ('daily_perf', 'cumulative_perf'):
perf_result = result[perf_kind]
# prices aren't changing, so pnl and returns should be 0.0
self.assertEqual(0.0, perf_result['pnl'],
"day %s %s pnl %s instead of 0.0" %
(i, perf_kind, perf_result['pnl']))
self.assertEqual(0.0, perf_result['returns'],
"day %s %s returns %s instead of 0.0" %
(i, perf_kind, perf_result['returns']))
class TestCommissionEvents(unittest.TestCase):
def setUp(self):
self.sim_params, self.dt, self.end_dt = \
create_random_simulation_parameters()
logger.info("sim_params: %s, dt: %s, end_dt: %s" %
(self.sim_params, self.dt, self.end_dt))
self.sim_params.capital_base = 10e3
self.benchmark_events = benchmark_events_in_range(self.sim_params)
def test_commission_event(self):
with trading.TradingEnvironment():
events = factory.create_trade_history(
1,
[10, 10, 10, 10, 10],
[100, 100, 100, 100, 100],
oneday,
self.sim_params
)
# Test commission models and validate result
# Expected commission amounts:
# PerShare commission: 1.00, 1.00, 1.50 = $3.50
# PerTrade commission: 5.00, 5.00, 5.00 = $15.00
# PerDollar commission: 1.50, 3.00, 4.50 = $9.00
# Total commission = $3.50 + $15.00 + $9.00 = $27.50
# Create 3 transactions: 50, 100, 150 shares traded @ $20
transactions = [create_txn(events[0], 20, i)
for i in [50, 100, 150]]
# Create commission models
models = [PerShare(cost=0.01, min_trade_cost=1.00),
PerTrade(cost=5.00),
PerDollar(cost=0.0015)]
# Aggregate commission amounts
total_commission = 0
for model in models:
for trade in transactions:
total_commission += model.calculate(trade)[1]
self.assertEqual(total_commission, 27.5)
cash_adj_dt = self.sim_params.first_open \
+ datetime.timedelta(hours=3)
cash_adjustment = factory.create_commission(1, 300.0,
cash_adj_dt)
# Insert a purchase order.
events.insert(0, create_txn(events[0], 20, 1))
events.insert(1, cash_adjustment)
results = calculate_results(self, events)
# Validate that we lost 320 dollars from our cash pool.
self.assertEqual(results[-1]['cumulative_perf']['ending_cash'],
9680)
# Validate that the cost basis of our position changed.
self.assertEqual(results[-1]['daily_perf']['positions']
[0]['cost_basis'], 320.0)
def test_commission_zero_position(self):
"""
Ensure no div-by-zero errors.
"""
with trading.TradingEnvironment():
events = factory.create_trade_history(
1,
[10, 10, 10, 10, 10],
[100, 100, 100, 100, 100],
oneday,
self.sim_params
)
cash_adj_dt = self.sim_params.first_open \
+ datetime.timedelta(hours=3)
cash_adjustment = factory.create_commission(1, 300.0,
cash_adj_dt)
# Insert a purchase order.
events.insert(0, create_txn(events[0], 20, 1))
# Sell that order.
events.insert(1, create_txn(events[1], 20, -1))
events.insert(2, cash_adjustment)
results = calculate_results(self, events)
# Validate that we lost 300 dollars from our cash pool.
self.assertEqual(results[-1]['cumulative_perf']['ending_cash'],
9700)
def test_commission_no_position(self):
"""
Ensure no position-not-found or sid-not-found errors.
"""
with trading.TradingEnvironment():
events = factory.create_trade_history(
1,
[10, 10, 10, 10, 10],
[100, 100, 100, 100, 100],
oneday,
self.sim_params
)
cash_adj_dt = self.sim_params.first_open \
+ datetime.timedelta(hours=3)
cash_adjustment = factory.create_commission(1, 300.0,
cash_adj_dt)
events.insert(0, cash_adjustment)
results = calculate_results(self, events)
# Validate that we lost 300 dollars from our cash pool.
self.assertEqual(results[-1]['cumulative_perf']['ending_cash'],
9700)
class TestDividendPerformance(unittest.TestCase):
def setUp(self):
self.sim_params, self.dt, self.end_dt = \
create_random_simulation_parameters()
self.sim_params.capital_base = 10e3
self.benchmark_events = benchmark_events_in_range(self.sim_params)
def test_market_hours_calculations(self):
with trading.TradingEnvironment():
# DST in US/Eastern began on Sunday March 14, 2010
before = datetime.datetime(2010, 3, 12, 14, 31, tzinfo=pytz.utc)
after = factory.get_next_trading_dt(
before,
datetime.timedelta(days=1)
)
self.assertEqual(after.hour, 13)
def test_long_position_receives_dividend(self):
with trading.TradingEnvironment():
# post some trades in the market
events = factory.create_trade_history(
1,
[10, 10, 10, 10, 10],
[100, 100, 100, 100, 100],
oneday,
self.sim_params
)
dividend = factory.create_dividend(
1,
10.00,
# declared date, when the algorithm finds out about
# the dividend
events[1].dt,
# ex_date, when the algorithm is credited with the
# dividend
events[1].dt,
# pay date, when the algorithm receives the dividend.
events[2].dt
)
txn = create_txn(events[0], 10.0, 100)
events.insert(0, txn)
events.insert(1, dividend)
results = calculate_results(self, events)
self.assertEqual(len(results), 5)
cumulative_returns = \
[event['cumulative_perf']['returns'] for event in results]
self.assertEqual(cumulative_returns, [0.0, 0.0, 0.1, 0.1, 0.1])
daily_returns = [event['daily_perf']['returns']
for event in results]
self.assertEqual(daily_returns, [0.0, 0.0, 0.10, 0.0, 0.0])
cash_flows = [event['daily_perf']['capital_used']
for event in results]
self.assertEqual(cash_flows, [-1000, 0, 1000, 0, 0])
cumulative_cash_flows = \
[event['cumulative_perf']['capital_used'] for event in results]
self.assertEqual(cumulative_cash_flows, [-1000, -1000, 0, 0, 0])
cash_pos = \
[event['cumulative_perf']['ending_cash'] for event in results]
self.assertEqual(cash_pos, [9000, 9000, 10000, 10000, 10000])
def test_long_position_receives_stock_dividend(self):
with trading.TradingEnvironment():
# post some trades in the market
events = []
for sid in (1, 2):
events.extend(
factory.create_trade_history(
sid,
[10, 10, 10, 10, 10],
[100, 100, 100, 100, 100],
oneday,
self.sim_params)
)
dividend = factory.create_stock_dividend(
1,
payment_sid=2,
ratio=2,
# declared date, when the algorithm finds out about
# the dividend
declared_date=events[1].dt,
# ex_date, when the algorithm is credited with the
# dividend
ex_date=events[1].dt,
# pay date, when the algorithm receives the dividend.
pay_date=events[2].dt
)
txn = create_txn(events[0], 10.0, 100)
events.insert(0, txn)
events.insert(1, dividend)
results = calculate_results(self, events)
self.assertEqual(len(results), 5)
cumulative_returns = \
[event['cumulative_perf']['returns'] for event in results]
self.assertEqual(cumulative_returns, [0.0, 0.0, 0.2, 0.2, 0.2])
daily_returns = [event['daily_perf']['returns']
for event in results]
self.assertEqual(daily_returns, [0.0, 0.0, 0.2, 0.0, 0.0])
cash_flows = [event['daily_perf']['capital_used']
for event in results]
self.assertEqual(cash_flows, [-1000, 0, 0, 0, 0])
cumulative_cash_flows = \
[event['cumulative_perf']['capital_used'] for event in results]
self.assertEqual(cumulative_cash_flows, [-1000] * 5)
cash_pos = \
[event['cumulative_perf']['ending_cash'] for event in results]
self.assertEqual(cash_pos, [9000] * 5)
def test_post_ex_long_position_receives_no_dividend(self):
# post some trades in the market
events = factory.create_trade_history(
1,
[10, 10, 10, 10, 10],
[100, 100, 100, 100, 100],
oneday,
self.sim_params
)
dividend = factory.create_dividend(
1,
10.00,
events[0].dt,
events[1].dt,
events[2].dt
)
events.insert(1, dividend)
txn = create_txn(events[3], 10.0, 100)
events.insert(4, txn)
results = calculate_results(self, events)
self.assertEqual(len(results), 5)
cumulative_returns = \
[event['cumulative_perf']['returns'] for event in results]
self.assertEqual(cumulative_returns, [0, 0, 0, 0, 0])
daily_returns = [event['daily_perf']['returns'] for event in results]
self.assertEqual(daily_returns, [0, 0, 0, 0, 0])
cash_flows = [event['daily_perf']['capital_used'] for event in results]
self.assertEqual(cash_flows, [0, 0, -1000, 0, 0])
cumulative_cash_flows = \
[event['cumulative_perf']['capital_used'] for event in results]
self.assertEqual(cumulative_cash_flows, [0, 0, -1000, -1000, -1000])
def test_selling_before_dividend_payment_still_gets_paid(self):
# post some trades in the market
events = factory.create_trade_history(
1,
[10, 10, 10, 10, 10],
[100, 100, 100, 100, 100],
oneday,
self.sim_params
)
dividend = factory.create_dividend(
1,
10.00,
events[0].dt,
events[1].dt,
events[3].dt
)
buy_txn = create_txn(events[0], 10.0, 100)
events.insert(1, buy_txn)
sell_txn = create_txn(events[3], 10.0, -100)
events.insert(4, sell_txn)
events.insert(0, dividend)
results = calculate_results(self, events)
self.assertEqual(len(results), 5)
cumulative_returns = \
[event['cumulative_perf']['returns'] for event in results]
self.assertEqual(cumulative_returns, [0, 0, 0, 0.1, 0.1])
daily_returns = [event['daily_perf']['returns'] for event in results]
self.assertEqual(daily_returns, [0, 0, 0, 0.1, 0])
cash_flows = [event['daily_perf']['capital_used'] for event in results]
self.assertEqual(cash_flows, [-1000, 0, 1000, 1000, 0])
cumulative_cash_flows = \
[event['cumulative_perf']['capital_used'] for event in results]
self.assertEqual(cumulative_cash_flows, [-1000, -1000, 0, 1000, 1000])
def test_buy_and_sell_before_ex(self):
# post some trades in the market
events = factory.create_trade_history(
1,
[10, 10, 10, 10, 10, 10],
[100, 100, 100, 100, 100, 100],
oneday,
self.sim_params
)
dividend = factory.create_dividend(
1,
10.00,
events[3].dt,
events[4].dt,
events[5].dt
)
buy_txn = create_txn(events[1], 10.0, 100)
events.insert(1, buy_txn)
sell_txn = create_txn(events[3], 10.0, -100)
events.insert(3, sell_txn)
events.insert(1, dividend)
results = calculate_results(self, events)
self.assertEqual(len(results), 6)
cumulative_returns = \
[event['cumulative_perf']['returns'] for event in results]
self.assertEqual(cumulative_returns, [0, 0, 0, 0, 0, 0])
daily_returns = [event['daily_perf']['returns'] for event in results]
self.assertEqual(daily_returns, [0, 0, 0, 0, 0, 0])
cash_flows = [event['daily_perf']['capital_used'] for event in results]
self.assertEqual(cash_flows, [0, -1000, 1000, 0, 0, 0])
cumulative_cash_flows = \
[event['cumulative_perf']['capital_used'] for event in results]
self.assertEqual(cumulative_cash_flows, [0, -1000, 0, 0, 0, 0])
def test_ending_before_pay_date(self):
# post some trades in the market
events = factory.create_trade_history(
1,
[10, 10, 10, 10, 10],
[100, 100, 100, 100, 100],
oneday,
self.sim_params
)
pay_date = self.sim_params.first_open
# find pay date that is much later.
for i in range(30):
pay_date = factory.get_next_trading_dt(pay_date, oneday)
dividend = factory.create_dividend(
1,
10.00,
events[0].dt,
events[1].dt,
pay_date
)
buy_txn = create_txn(events[1], 10.0, 100)
events.insert(2, buy_txn)
events.insert(1, dividend)
results = calculate_results(self, events)
self.assertEqual(len(results), 5)
cumulative_returns = \
[event['cumulative_perf']['returns'] for event in results]
self.assertEqual(cumulative_returns, [0, 0, 0, 0.0, 0.0])
daily_returns = [event['daily_perf']['returns'] for event in results]
self.assertEqual(daily_returns, [0, 0, 0, 0, 0])
cash_flows = [event['daily_perf']['capital_used'] for event in results]
self.assertEqual(cash_flows, [0, -1000, 0, 0, 0])
cumulative_cash_flows = \
[event['cumulative_perf']['capital_used'] for event in results]
self.assertEqual(
cumulative_cash_flows,
[0, -1000, -1000, -1000, -1000]
)
def test_short_position_pays_dividend(self):
# post some trades in the market
events = factory.create_trade_history(
1,
[10, 10, 10, 10, 10],
[100, 100, 100, 100, 100],
oneday,
self.sim_params
)
dividend = factory.create_dividend(
1,
10.00,
# declare at open of test
events[0].dt,
# ex_date same as trade 2
events[2].dt,
events[3].dt
)
txn = create_txn(events[1], 10.0, -100)
events.insert(1, txn)
events.insert(0, dividend)
results = calculate_results(self, events)
self.assertEqual(len(results), 5)
cumulative_returns = \
[event['cumulative_perf']['returns'] for event in results]
self.assertEqual(cumulative_returns, [0.0, 0.0, 0.0, -0.1, -0.1])
daily_returns = [event['daily_perf']['returns'] for event in results]
self.assertEqual(daily_returns, [0.0, 0.0, 0.0, -0.1, 0.0])
cash_flows = [event['daily_perf']['capital_used'] for event in results]
self.assertEqual(cash_flows, [0, 1000, 0, -1000, 0])
cumulative_cash_flows = \
[event['cumulative_perf']['capital_used'] for event in results]
self.assertEqual(cumulative_cash_flows, [0, 1000, 1000, 0, 0])
def test_no_position_receives_no_dividend(self):
# post some trades in the market
events = factory.create_trade_history(
1,
[10, 10, 10, 10, 10],
[100, 100, 100, 100, 100],
oneday,
self.sim_params
)
dividend = factory.create_dividend(
1,
10.00,
events[0].dt,
events[1].dt,
events[2].dt
)
events.insert(1, dividend)
results = calculate_results(self, events)
self.assertEqual(len(results), 5)
cumulative_returns = \
[event['cumulative_perf']['returns'] for event in results]
self.assertEqual(cumulative_returns, [0.0, 0.0, 0.0, 0.0, 0.0])
daily_returns = [event['daily_perf']['returns'] for event in results]
self.assertEqual(daily_returns, [0.0, 0.0, 0.0, 0.0, 0.0])
cash_flows = [event['daily_perf']['capital_used'] for event in results]
self.assertEqual(cash_flows, [0, 0, 0, 0, 0])
cumulative_cash_flows = \
[event['cumulative_perf']['capital_used'] for event in results]
self.assertEqual(cumulative_cash_flows, [0, 0, 0, 0, 0])
class TestDividendPerformanceHolidayStyle(TestDividendPerformance):
# The holiday tests begins the simulation on the day
# before Thanksgiving, so that the next trading day is
# two days ahead. Any tests that hard code events
# to be start + oneday will fail, since those events will
# be skipped by the simulation.
def setUp(self):
self.dt = datetime.datetime(2003, 11, 30, tzinfo=pytz.utc)
self.end_dt = datetime.datetime(2004, 11, 25, tzinfo=pytz.utc)
self.sim_params = SimulationParameters(
self.dt,
self.end_dt)
self.benchmark_events = benchmark_events_in_range(self.sim_params)
class TestPositionPerformance(unittest.TestCase):
def setUp(self):
self.sim_params, self.dt, self.end_dt = \
create_random_simulation_parameters()
self.benchmark_events = benchmark_events_in_range(self.sim_params)
def test_long_position(self):
"""
verify that the performance period calculates properly for a
single buy transaction
"""
# post some trades in the market
trades = factory.create_trade_history(
1,
[10, 10, 10, 11],
[100, 100, 100, 100],
onesec,
self.sim_params
)
txn = create_txn(trades[1], 10.0, 100)
pp = perf.PerformancePeriod(1000.0)
pp.execute_transaction(txn)
for trade in trades:
pp.update_last_sale(trade)
pp.calculate_performance()
self.assertEqual(
pp.period_cash_flow,
-1 * txn.price * txn.amount,
"capital used should be equal to the opposite of the transaction \
cost of sole txn in test"
)
self.assertEqual(
len(pp.positions),
1,
"should be just one position")
self.assertEqual(
pp.positions[1].sid,
txn.sid,
"position should be in security with id 1")
self.assertEqual(
pp.positions[1].amount,
txn.amount,
"should have a position of {sharecount} shares".format(
sharecount=txn.amount
)
)
self.assertEqual(
pp.positions[1].cost_basis,
txn.price,
"should have a cost basis of 10"
)
self.assertEqual(
pp.positions[1].last_sale_price,
trades[-1]['price'],
"last sale should be same as last trade. \
expected {exp} actual {act}".format(
exp=trades[-1]['price'],
act=pp.positions[1].last_sale_price)
)
self.assertEqual(
pp.ending_value,
1100,
"ending value should be price of last trade times number of \
shares in position"
)
self.assertEqual(pp.pnl, 100, "gain of 1 on 100 shares should be 100")
def test_short_position(self):
"""verify that the performance period calculates properly for a \
single short-sale transaction"""
trades = factory.create_trade_history(
1,
[10, 10, 10, 11, 10, 9],
[100, 100, 100, 100, 100, 100],
onesec,
self.sim_params
)
trades_1 = trades[:-2]
txn = create_txn(trades[1], 10.0, -100)
pp = perf.PerformancePeriod(1000.0)
pp.execute_transaction(txn)
for trade in trades_1:
pp.update_last_sale(trade)
pp.calculate_performance()
self.assertEqual(
pp.period_cash_flow,
-1 * txn.price * txn.amount,
"capital used should be equal to the opposite of the transaction\
cost of sole txn in test"
)
self.assertEqual(
len(pp.positions),
1,
"should be just one position")
self.assertEqual(
pp.positions[1].sid,
txn.sid,
"position should be in security from the transaction"
)
self.assertEqual(
pp.positions[1].amount,
-100,
"should have a position of -100 shares"
)
self.assertEqual(
pp.positions[1].cost_basis,
txn.price,
"should have a cost basis of 10"
)
self.assertEqual(
pp.positions[1].last_sale_price,
trades_1[-1]['price'],
"last sale should be price of last trade"
)
self.assertEqual(
pp.ending_value,
-1100,
"ending value should be price of last trade times number of \
shares in position"
)
self.assertEqual(pp.pnl, -100, "gain of 1 on 100 shares should be 100")
# simulate additional trades, and ensure that the position value
# reflects the new price
trades_2 = trades[-2:]
# simulate a rollover to a new period
pp.rollover()
for trade in trades_2:
pp.update_last_sale(trade)
pp.calculate_performance()
self.assertEqual(
pp.period_cash_flow,
0,
"capital used should be zero, there were no transactions in \
performance period"
)
self.assertEqual(
len(pp.positions),
1,
"should be just one position"
)
self.assertEqual(
pp.positions[1].sid,
txn.sid,
"position should be in security from the transaction"
)
self.assertEqual(
pp.positions[1].amount,
-100,
"should have a position of -100 shares"
)
self.assertEqual(
pp.positions[1].cost_basis,
txn.price,
"should have a cost basis of 10"
)
self.assertEqual(
pp.positions[1].last_sale_price,
trades_2[-1].price,
"last sale should be price of last trade"
)
self.assertEqual(
pp.ending_value,
-900,
"ending value should be price of last trade times number of \
shares in position")
self.assertEqual(
pp.pnl,
200,
"drop of 2 on -100 shares should be 200"
)
# now run a performance period encompassing the entire trade sample.
ppTotal = perf.PerformancePeriod(1000.0)
for trade in trades_1:
ppTotal.update_last_sale(trade)
ppTotal.execute_transaction(txn)
for trade in trades_2:
ppTotal.update_last_sale(trade)
ppTotal.calculate_performance()
self.assertEqual(
ppTotal.period_cash_flow,
-1 * txn.price * txn.amount,
"capital used should be equal to the opposite of the transaction \
cost of sole txn in test"
)
self.assertEqual(
len(ppTotal.positions),
1,
"should be just one position"
)
self.assertEqual(
ppTotal.positions[1].sid,
txn.sid,
"position should be in security from the transaction"
)
self.assertEqual(
ppTotal.positions[1].amount,
-100,
"should have a position of -100 shares"
)
self.assertEqual(
ppTotal.positions[1].cost_basis,
txn.price,
"should have a cost basis of 10"
)
self.assertEqual(
ppTotal.positions[1].last_sale_price,
trades_2[-1].price,
"last sale should be price of last trade"
)
self.assertEqual(
ppTotal.ending_value,
-900,
"ending value should be price of last trade times number of \
shares in position")
self.assertEqual(
ppTotal.pnl,
100,
"drop of 1 on -100 shares should be 100"
)
def test_covering_short(self):
"""verify performance where short is bought and covered, and shares \
trade after cover"""
trades = factory.create_trade_history(
1,
[10, 10, 10, 11, 9, 8, 7, 8, 9, 10],
[100, 100, 100, 100, 100, 100, 100, 100, 100, 100],
onesec,
self.sim_params
)
short_txn = create_txn(
trades[1],
10.0,
-100,
)
cover_txn = create_txn(trades[6], 7.0, 100)
pp = perf.PerformancePeriod(1000.0)
pp.execute_transaction(short_txn)
pp.execute_transaction(cover_txn)
for trade in trades:
pp.update_last_sale(trade)
pp.calculate_performance()
short_txn_cost = short_txn.price * short_txn.amount
cover_txn_cost = cover_txn.price * cover_txn.amount
self.assertEqual(
pp.period_cash_flow,
-1 * short_txn_cost - cover_txn_cost,
"capital used should be equal to the net transaction costs"
)
self.assertEqual(
len(pp.positions),
1,
"should be just one position"
)
self.assertEqual(
pp.positions[1].sid,
short_txn.sid,
"position should be in security from the transaction"
)
self.assertEqual(
pp.positions[1].amount,
0,
"should have a position of -100 shares"
)
self.assertEqual(
pp.positions[1].cost_basis,
0,
"a covered position should have a cost basis of 0"
)
self.assertEqual(
pp.positions[1].last_sale_price,
trades[-1].price,
"last sale should be price of last trade"
)
self.assertEqual(
pp.ending_value,
0,
"ending value should be price of last trade times number of \
shares in position"
)
self.assertEqual(
pp.pnl,
300,
"gain of 1 on 100 shares should be 300"
)
def test_cost_basis_calc(self):
history_args = (
1,
[10, 11, 11, 12],
[100, 100, 100, 100],
onesec,
self.sim_params
)
trades = factory.create_trade_history(*history_args)
transactions = factory.create_txn_history(*history_args)
pp = perf.PerformancePeriod(1000.0)
average_cost = 0
for i, txn in enumerate(transactions):
pp.execute_transaction(txn)
average_cost = (average_cost * i + txn.price) / (i + 1)
self.assertEqual(pp.positions[1].cost_basis, average_cost)
for trade in trades:
pp.update_last_sale(trade)
pp.calculate_performance()
self.assertEqual(
pp.positions[1].last_sale_price,
trades[-1].price,
"should have a last sale of 12, got {val}".format(
val=pp.positions[1].last_sale_price)
)
self.assertEqual(
pp.positions[1].cost_basis,
11,
"should have a cost basis of 11"
)
self.assertEqual(
pp.pnl,
400
)
down_tick = factory.create_trade(
1,
10.0,
100,
trades[-1].dt + onesec)
sale_txn = create_txn(
down_tick,
10.0,
-100)
pp.rollover()
pp.execute_transaction(sale_txn)
pp.update_last_sale(down_tick)
pp.calculate_performance()
self.assertEqual(
pp.positions[1].last_sale_price,
10,
"should have a last sale of 10, was {val}".format(
val=pp.positions[1].last_sale_price)
)
self.assertEqual(
pp.positions[1].cost_basis,
11,
"should have a cost basis of 11"
)
self.assertEqual(pp.pnl, -800, "this period goes from +400 to -400")
pp3 = perf.PerformancePeriod(1000.0)
average_cost = 0
for i, txn in enumerate(transactions):
pp3.execute_transaction(txn)
average_cost = (average_cost * i + txn.price) / (i + 1)
self.assertEqual(pp3.positions[1].cost_basis, average_cost)
pp3.execute_transaction(sale_txn)
trades.append(down_tick)
for trade in trades:
pp3.update_last_sale(trade)
pp3.calculate_performance()
self.assertEqual(
pp3.positions[1].last_sale_price,
10,
"should have a last sale of 10"
)
self.assertEqual(
pp3.positions[1].cost_basis,
11,
"should have a cost basis of 11"
)
self.assertEqual(
pp3.pnl,
-400,
"should be -400 for all trades and transactions in period"
)
def test_cost_basis_calc_close_pos(self):
history_args = (
1,
[10, 9, 11, 8, 9, 12, 13, 14],
[200, -100, -100, 100, -300, 100, 500, 400],
onesec,
self.sim_params
)
cost_bases = [10, 10, 0, 8, 9, 9, 13, 13.5]
trades = factory.create_trade_history(*history_args)
transactions = factory.create_txn_history(*history_args)
pp = perf.PerformancePeriod(1000.0)
for txn, cb in zip(transactions, cost_bases):
pp.execute_transaction(txn)
self.assertEqual(pp.positions[1].cost_basis, cb)
for trade in trades:
pp.update_last_sale(trade)
pp.calculate_performance()
self.assertEqual(pp.positions[1].cost_basis, cost_bases[-1])
class TestPerformanceTracker(unittest.TestCase):
NumDaysToDelete = collections.namedtuple(
'NumDaysToDelete', ('start', 'middle', 'end'))
@parameterized.expand([
("Don't delete any events",
NumDaysToDelete(start=0, middle=0, end=0)),
("Delete first day of events",
NumDaysToDelete(start=1, middle=0, end=0)),
("Delete first two days of events",
NumDaysToDelete(start=2, middle=0, end=0)),
("Delete one day of events from the middle",
NumDaysToDelete(start=0, middle=1, end=0)),
("Delete two events from the middle",
NumDaysToDelete(start=0, middle=2, end=0)),
("Delete last day of events",
NumDaysToDelete(start=0, middle=0, end=1)),
("Delete last two days of events",
NumDaysToDelete(start=0, middle=0, end=2)),
("Delete all but one event.",
NumDaysToDelete(start=2, middle=1, end=2)),
])
def test_tracker(self, parameter_comment, days_to_delete):
"""
@days_to_delete - configures which days in the data set we should
remove, used for ensuring that we still return performance messages
even when there is no data.
"""
# This date range covers Columbus day,
# however Columbus day is not a market holiday
#
# October 2008
# Su Mo Tu We Th Fr Sa
# 1 2 3 4
# 5 6 7 8 9 10 11
# 12 13 14 15 16 17 18
# 19 20 21 22 23 24 25
# 26 27 28 29 30 31
start_dt = datetime.datetime(year=2008,
month=10,
day=9,
tzinfo=pytz.utc)
end_dt = datetime.datetime(year=2008,
month=10,
day=16,
tzinfo=pytz.utc)
trade_count = 6
sid = 133
price = 10.1
price_list = [price] * trade_count
volume = [100] * trade_count
trade_time_increment = datetime.timedelta(days=1)
sim_params = SimulationParameters(
period_start=start_dt,
period_end=end_dt
)
benchmark_events = benchmark_events_in_range(sim_params)
trade_history = factory.create_trade_history(
sid,
price_list,
volume,
trade_time_increment,
sim_params,
source_id="factory1"
)
sid2 = 134
price2 = 12.12
price2_list = [price2] * trade_count
trade_history2 = factory.create_trade_history(
sid2,
price2_list,
volume,
trade_time_increment,
sim_params,
source_id="factory2"
)
# 'middle' start of 3 depends on number of days == 7
middle = 3
# First delete from middle
if days_to_delete.middle:
del trade_history[middle:(middle + days_to_delete.middle)]
del trade_history2[middle:(middle + days_to_delete.middle)]
# Delete start
if days_to_delete.start:
del trade_history[:days_to_delete.start]
del trade_history2[:days_to_delete.start]
# Delete from end
if days_to_delete.end:
del trade_history[-days_to_delete.end:]
del trade_history2[-days_to_delete.end:]
sim_params.first_open = \
sim_params.calculate_first_open()
sim_params.last_close = \
sim_params.calculate_last_close()
sim_params.capital_base = 1000.0
sim_params.frame_index = [
'sid',
'volume',
'dt',
'price',
'changed']
perf_tracker = perf.PerformanceTracker(
sim_params
)
events = date_sorted_sources(trade_history, trade_history2)
events = [event for event in
self.trades_with_txns(events, trade_history[0].dt)]
# Extract events with transactions to use for verification.
txns = [event for event in
events if event.type == DATASOURCE_TYPE.TRANSACTION]
orders = [event for event in
events if event.type == DATASOURCE_TYPE.ORDER]
all_events = date_sorted_sources(events, benchmark_events)
filtered_events = [filt_event for filt_event
in all_events if filt_event.dt <= end_dt]
filtered_events.sort(key=lambda x: x.dt)
grouped_events = itertools.groupby(filtered_events, lambda x: x.dt)
perf_messages = []
for date, group in grouped_events:
for event in group:
perf_tracker.process_event(event)
msg = perf_tracker.handle_market_close()
perf_messages.append(msg)
self.assertEqual(perf_tracker.txn_count, len(txns))
self.assertEqual(perf_tracker.txn_count, len(orders))
cumulative_pos = perf_tracker.cumulative_performance.positions[sid]
expected_size = len(txns) / 2 * -25
self.assertEqual(cumulative_pos.amount, expected_size)
self.assertEqual(len(perf_messages),
sim_params.days_in_period)
def trades_with_txns(self, events, no_txn_dt):
for event in events:
# create a transaction for all but
# first trade in each sid, to simulate None transaction
if event.dt != no_txn_dt:
order = Order(
sid=event.sid,
amount=-25,
dt=event.dt
)
order.source_id = 'MockOrderSource'
yield order
yield event
txn = Transaction(
sid=event.sid,
amount=-25,
dt=event.dt,
price=10.0,
commission=0.50,
order_id=order.id
)
txn.source_id = 'MockTransactionSource'
yield txn
else:
yield event
def test_minute_tracker(self):
""" Tests minute performance tracking."""
with trading.TradingEnvironment():
start_dt = trading.environment.exchange_dt_in_utc(
datetime.datetime(2013, 3, 1, 9, 31))
end_dt = trading.environment.exchange_dt_in_utc(
datetime.datetime(2013, 3, 1, 16, 0))
sim_params = SimulationParameters(
period_start=start_dt,
period_end=end_dt,
emission_rate='minute'
)
tracker = perf.PerformanceTracker(sim_params)
foo_event_1 = factory.create_trade('foo', 10.0, 20, start_dt)
order_event_1 = Order(sid=foo_event_1.sid,
amount=-25,
dt=foo_event_1.dt)
bar_event_1 = factory.create_trade('bar', 100.0, 200, start_dt)
txn_event_1 = Transaction(sid=foo_event_1.sid,
amount=-25,
dt=foo_event_1.dt,
price=10.0,
commission=0.50,
order_id=order_event_1.id)
benchmark_event_1 = Event({
'dt': start_dt,
'returns': 0.01,
'type': DATASOURCE_TYPE.BENCHMARK
})
foo_event_2 = factory.create_trade(
'foo', 11.0, 20, start_dt + datetime.timedelta(minutes=1))
bar_event_2 = factory.create_trade(
'bar', 11.0, 20, start_dt + datetime.timedelta(minutes=1))
benchmark_event_2 = Event({
'dt': start_dt + datetime.timedelta(minutes=1),
'returns': 0.02,
'type': DATASOURCE_TYPE.BENCHMARK
})
events = [
foo_event_1,
order_event_1,
benchmark_event_1,
txn_event_1,
bar_event_1,
foo_event_2,
benchmark_event_2,
bar_event_2,
]
grouped_events = itertools.groupby(
events, operator.attrgetter('dt'))
messages = {}
for date, group in grouped_events:
tracker.set_date(date)
for event in group:
tracker.process_event(event)
tracker.handle_minute_close(date)
msg = tracker.to_dict()
messages[date] = msg
self.assertEquals(2, len(messages))
msg_1 = messages[foo_event_1.dt]
msg_2 = messages[foo_event_2.dt]
self.assertEquals(1, len(msg_1['minute_perf']['transactions']),
"The first message should contain one "
"transaction.")
# Check that transactions aren't emitted for previous events.
self.assertEquals(0, len(msg_2['minute_perf']['transactions']),
"The second message should have no "
"transactions.")
self.assertEquals(1, len(msg_1['minute_perf']['orders']),
"The first message should contain one orders.")
# Check that orders aren't emitted for previous events.
self.assertEquals(0, len(msg_2['minute_perf']['orders']),
"The second message should have no orders.")
# Ensure that period_close moves through time.
# Also, ensure that the period_closes are the expected dts.
self.assertEquals(foo_event_1.dt,
msg_1['minute_perf']['period_close'])
self.assertEquals(foo_event_2.dt,
msg_2['minute_perf']['period_close'])
# Ensure that a Sharpe value for cumulative metrics is being
# created.
self.assertIsNotNone(msg_1['cumulative_risk_metrics']['sharpe'])
self.assertIsNotNone(msg_2['cumulative_risk_metrics']['sharpe'])