Files
catalyst/tests/test_perf_tracking.py
2017-06-19 14:43:10 -07:00

2171 lines
76 KiB
Python

#
# Copyright 2016 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
import copy
from datetime import (
datetime,
timedelta,
)
import logging
import nose.tools as nt
import pytz
import pandas as pd
import numpy as np
from six.moves import range, zip
from catalyst.assets import Asset
from catalyst.assets.synthetic import make_simple_equity_info
from catalyst.data.us_equity_pricing import (
SQLiteAdjustmentWriter,
SQLiteAdjustmentReader,
)
import catalyst.utils.factory as factory
import catalyst.finance.performance as perf
from catalyst.finance.transaction import create_transaction
import catalyst.utils.math_utils as zp_math
from catalyst.finance.blotter import Order
from catalyst.finance.performance.position import Position
from catalyst.utils.factory import create_simulation_parameters
from catalyst.utils.serialization_utils import (
loads_with_persistent_ids, dumps_with_persistent_ids
)
from catalyst.testing import (
MockDailyBarReader,
create_data_portal_from_trade_history,
create_empty_splits_mergers_frame,
tmp_trading_env,
)
from catalyst.testing.fixtures import (
WithInstanceTmpDir,
WithSimParams,
WithTmpDir,
WithTradingEnvironment,
ZiplineTestCase,
)
from catalyst.utils.calendars import get_calendar
logger = logging.getLogger('Test Perf Tracking')
oneday = timedelta(days=1)
tradingday = timedelta(hours=6, minutes=30)
# nose.tools changed name in python 3
if not hasattr(nt, 'assert_count_equal'):
nt.assert_count_equal = nt.assert_items_equal
def check_perf_period(pp,
gross_leverage,
net_leverage,
long_exposure,
longs_count,
short_exposure,
shorts_count):
perf_data = pp.to_dict()
np.testing.assert_allclose(
gross_leverage, perf_data['gross_leverage'], rtol=1e-3)
np.testing.assert_allclose(
net_leverage, perf_data['net_leverage'], rtol=1e-3)
np.testing.assert_allclose(
long_exposure, perf_data['long_exposure'], rtol=1e-3)
np.testing.assert_allclose(
longs_count, perf_data['longs_count'], rtol=1e-3)
np.testing.assert_allclose(
short_exposure, perf_data['short_exposure'], rtol=1e-3)
np.testing.assert_allclose(
shorts_count, perf_data['shorts_count'], rtol=1e-3)
def check_account(account,
settled_cash,
equity_with_loan,
total_positions_value,
total_positions_exposure,
regt_equity,
available_funds,
excess_liquidity,
cushion,
leverage,
net_leverage,
net_liquidation):
# this is a long only portfolio that is only partially invested
# so net and gross leverage are equal.
np.testing.assert_allclose(settled_cash,
account.settled_cash, rtol=1e-3)
np.testing.assert_allclose(equity_with_loan,
account.equity_with_loan, rtol=1e-3)
np.testing.assert_allclose(total_positions_value,
account.total_positions_value, rtol=1e-3)
np.testing.assert_allclose(total_positions_exposure,
account.total_positions_exposure, rtol=1e-3)
np.testing.assert_allclose(regt_equity,
account.regt_equity, rtol=1e-3)
np.testing.assert_allclose(available_funds,
account.available_funds, rtol=1e-3)
np.testing.assert_allclose(excess_liquidity,
account.excess_liquidity, rtol=1e-3)
np.testing.assert_allclose(cushion,
account.cushion, rtol=1e-3)
np.testing.assert_allclose(leverage, account.leverage, rtol=1e-3)
np.testing.assert_allclose(net_leverage,
account.net_leverage, rtol=1e-3)
np.testing.assert_allclose(net_liquidation,
account.net_liquidation, rtol=1e-3)
def create_txn(asset, dt, price, amount):
"""
Create a fake transaction to be filled and processed prior to the execution
of a given trade event.
"""
if not isinstance(asset, Asset):
raise ValueError("pass an asset to create_txn")
mock_order = Order(dt, asset, amount, id=None)
return create_transaction(mock_order, dt, price, amount)
def calculate_results(sim_params,
env,
data_portal,
splits=None,
txns=None,
commissions=None):
"""
Run the given events through a stripped down version of the loop in
AlgorithmSimulator.transform.
IMPORTANT NOTE FOR TEST WRITERS/READERS:
This loop has some wonky logic for the order of event processing for
datasource types. This exists mostly to accommodate legacy tests that were
making assumptions about how events would be sorted.
In particular:
- Dividends passed for a given date are processed PRIOR to any events
for that date.
- Splits passed for a given date are process AFTER any events for that
date.
Tests that use this helper should not be considered useful guarantees of
the behavior of AlgorithmSimulator on a stream containing the same events
unless the subgroups have been explicitly re-sorted in this way.
"""
txns = txns or []
splits = splits or {}
commissions = commissions or {}
perf_tracker = perf.PerformanceTracker(
sim_params, get_calendar("NYSE"), env
)
results = []
for date in sim_params.sessions:
for txn in filter(lambda txn: txn.dt == date, txns):
# Process txns for this date.
perf_tracker.process_transaction(txn)
try:
commissions_for_date = commissions[date]
for comm in commissions_for_date:
perf_tracker.process_commission(comm)
except KeyError:
pass
try:
splits_for_date = splits[date]
perf_tracker.handle_splits(splits_for_date)
except KeyError:
pass
msg = perf_tracker.handle_market_close(date, data_portal)
perf_tracker.position_tracker.sync_last_sale_prices(
date, False, data_portal,
)
msg['account'] = perf_tracker.get_account(True)
results.append(copy.deepcopy(msg))
return results
def check_perf_tracker_serialization(perf_tracker):
scalar_keys = [
'emission_rate',
'txn_count',
'market_open',
'last_close',
'start_session',
'day_count',
'capital_base',
'market_close',
'saved_dt',
'period_end',
'total_days',
]
p_string = dumps_with_persistent_ids(perf_tracker)
test = loads_with_persistent_ids(p_string, env=perf_tracker.env)
for k in scalar_keys:
nt.assert_equal(getattr(test, k), getattr(perf_tracker, k), k)
perf_periods = (
test.cumulative_performance,
test.todays_performance
)
for period in perf_periods:
nt.assert_true(hasattr(period, '_position_tracker'))
def setup_env_data(env, sim_params, sids, futures_sids=[]):
data = {}
for sid in sids:
data[sid] = {
"start_date": sim_params.sessions[0],
"end_date": get_calendar("NYSE").next_session_label(
sim_params.sessions[-1]
)
}
env.write_data(equities_data=data)
futures_data = {}
for future_sid in futures_sids:
futures_data[future_sid] = {
"start_date": sim_params.sessions[0],
# (obviously) FIXME once we have a future calendar
"end_date": get_calendar("NYSE").next_session_label(
sim_params.sessions[-1]
),
"multiplier": 100
}
env.write_data(futures_data=futures_data)
class TestSplitPerformance(WithSimParams, WithTmpDir, ZiplineTestCase):
START_DATE = pd.Timestamp('2006-01-03', tz='utc')
END_DATE = pd.Timestamp('2006-01-04', tz='utc')
SIM_PARAMS_CAPITAL_BASE = 10e3
ASSET_FINDER_EQUITY_SIDS = 1, 2
@classmethod
def init_class_fixtures(cls):
super(TestSplitPerformance, cls).init_class_fixtures()
cls.asset1 = cls.env.asset_finder.retrieve_asset(1)
cls.asset2 = cls.env.asset_finder.retrieve_asset(2)
def test_multiple_splits(self):
# if multiple positions all have splits at the same time, verify that
# the total leftover cash is correct
perf_tracker = perf.PerformanceTracker(self.sim_params,
self.trading_calendar,
self.env)
perf_tracker.position_tracker.positions[1] = \
Position(self.asset1, amount=10, cost_basis=10, last_sale_price=11)
perf_tracker.position_tracker.positions[2] = \
Position(self.asset2, amount=10, cost_basis=10, last_sale_price=11)
leftover_cash = perf_tracker.position_tracker.handle_splits(
[(self.asset1, 0.333), (self.asset2, 0.333)]
)
# we used to have 10 shares that each cost us $10, total $100
# now we have 33 shares that each cost us $3.33, total $99.9
# each position returns $0.10 as leftover cash
self.assertEqual(0.2, leftover_cash)
def test_split_long_position(self):
events = factory.create_trade_history(
self.asset1,
# TODO: Should we provide adjusted prices in the tests, or provide
# raw prices and adjust via DataPortal?
[20, 60],
[100, 100],
oneday,
self.sim_params,
trading_calendar=self.trading_calendar,
)
# set up a long position in sid 1
# 100 shares at $20 apiece = $2000 position
data_portal = create_data_portal_from_trade_history(
self.env.asset_finder,
self.trading_calendar,
self.tmpdir,
self.sim_params,
{1: events},
)
txns = [create_txn(self.asset1, events[0].dt, 20, 100)]
# set up a split with ratio 3 occurring at the start of the second
# day.
splits = {
events[1].dt: [(self.asset1, 3)]
}
results = calculate_results(self.sim_params,
self.env,
data_portal,
txns=txns,
splits=splits)
# should have 33 shares (at $60 apiece) and $20 in cash
self.assertEqual(2, len(results))
latest_positions = results[1]['daily_perf']['positions']
self.assertEqual(1, len(latest_positions))
# check the last position to make sure it's been updated
position = latest_positions[0]
self.assertEqual(self.asset1, position['sid'])
self.assertEqual(33, position['amount'])
self.assertEqual(60, position['cost_basis'])
self.assertEqual(60, position['last_sale_price'])
# since we started with $10000, and we spent $2000 on the
# position, but then got $20 back, we should have $8020
# (or close to it) in cash.
# we won't get exactly 8020 because sometimes a split is
# denoted as a ratio like 0.3333, and we lose some digits
# of precision. thus, make sure we're pretty close.
daily_perf = results[1]['daily_perf']
self.assertTrue(
zp_math.tolerant_equals(8020,
daily_perf['ending_cash'], 1),
"ending_cash was {0}".format(daily_perf['ending_cash']))
# Validate that the account attributes were updated.
account = results[1]['account']
self.assertEqual(float('inf'), account.day_trades_remaining)
# this is a long only portfolio that is only partially invested
# so net and gross leverage are equal.
np.testing.assert_allclose(0.198, account.leverage, rtol=1e-3)
np.testing.assert_allclose(0.198, account.net_leverage, rtol=1e-3)
np.testing.assert_allclose(8020, account.regt_equity, rtol=1e-3)
self.assertEqual(float('inf'), account.regt_margin)
np.testing.assert_allclose(8020, account.available_funds, rtol=1e-3)
self.assertEqual(0, account.maintenance_margin_requirement)
np.testing.assert_allclose(10000,
account.equity_with_loan, rtol=1e-3)
self.assertEqual(float('inf'), account.buying_power)
self.assertEqual(0, account.initial_margin_requirement)
np.testing.assert_allclose(8020, account.excess_liquidity,
rtol=1e-3)
np.testing.assert_allclose(8020, account.settled_cash, rtol=1e-3)
np.testing.assert_allclose(10000, account.net_liquidation,
rtol=1e-3)
np.testing.assert_allclose(0.802, account.cushion, rtol=1e-3)
np.testing.assert_allclose(1980, account.total_positions_value,
rtol=1e-3)
self.assertEqual(0, account.accrued_interest)
for i, result in enumerate(results):
for perf_kind in ('daily_perf', 'cumulative_perf'):
perf_result = result[perf_kind]
# prices aren't changing, so pnl and returns should be 0.0
self.assertEqual(0.0, perf_result['pnl'],
"day %s %s pnl %s instead of 0.0" %
(i, perf_kind, perf_result['pnl']))
self.assertEqual(0.0, perf_result['returns'],
"day %s %s returns %s instead of 0.0" %
(i, perf_kind, perf_result['returns']))
class TestDividendPerformance(WithSimParams,
WithInstanceTmpDir,
ZiplineTestCase):
START_DATE = pd.Timestamp('2006-01-03', tz='utc')
END_DATE = pd.Timestamp('2006-01-10', tz='utc')
ASSET_FINDER_EQUITY_SIDS = 1, 2
SIM_PARAMS_CAPITAL_BASE = 10e3
@classmethod
def init_class_fixtures(cls):
super(TestDividendPerformance, cls).init_class_fixtures()
cls.asset1 = cls.asset_finder.retrieve_asset(1)
cls.asset2 = cls.asset_finder.retrieve_asset(2)
def test_market_hours_calculations(self):
# DST in US/Eastern began on Sunday March 14, 2010
before = datetime(2010, 3, 12, 14, 31, tzinfo=pytz.utc)
after = factory.get_next_trading_dt(
before,
timedelta(days=1),
self.trading_calendar,
)
self.assertEqual(after.hour, 13)
def test_long_position_receives_dividend(self):
# post some trades in the market
events = factory.create_trade_history(
self.asset1,
[10, 10, 10, 10, 10, 10],
[100, 100, 100, 100, 100, 100],
oneday,
self.sim_params,
trading_calendar=self.trading_calendar,
)
dbpath = self.instance_tmpdir.getpath('adjustments.sqlite')
writer = SQLiteAdjustmentWriter(
dbpath,
MockDailyBarReader(),
self.trading_calendar.all_sessions,
)
splits = mergers = create_empty_splits_mergers_frame()
dividends = pd.DataFrame({
'sid': np.array([1], dtype=np.uint32),
'amount': np.array([10.00], dtype=np.float64),
'declared_date': np.array([events[0].dt], dtype='datetime64[ns]'),
'ex_date': np.array([events[1].dt], dtype='datetime64[ns]'),
'record_date': np.array([events[1].dt], dtype='datetime64[ns]'),
'pay_date': np.array([events[2].dt], dtype='datetime64[ns]'),
})
writer.write(splits, mergers, dividends)
adjustment_reader = SQLiteAdjustmentReader(dbpath)
data_portal = create_data_portal_from_trade_history(
self.env.asset_finder,
self.trading_calendar,
self.instance_tmpdir,
self.sim_params,
{1: events},
)
data_portal._adjustment_reader = adjustment_reader
# Simulate a transaction being filled prior to the ex_date.
txns = [create_txn(self.asset1, events[0].dt, 10.0, 100)]
results = calculate_results(
self.sim_params,
self.env,
data_portal,
txns=txns,
)
self.assertEqual(len(results), 6)
cumulative_returns = \
[event['cumulative_perf']['returns'] for event in results]
self.assertEqual(cumulative_returns, [0.0, 0.0, 0.1, 0.1, 0.1, 0.1])
daily_returns = [event['daily_perf']['returns']
for event in results]
self.assertEqual(daily_returns, [0.0, 0.0, 0.10, 0.0, 0.0, 0.0])
cash_flows = [event['daily_perf']['capital_used']
for event in results]
self.assertEqual(cash_flows, [-1000, 0, 1000, 0, 0, 0])
cumulative_cash_flows = \
[event['cumulative_perf']['capital_used'] for event in results]
self.assertEqual(cumulative_cash_flows, [-1000, -1000, 0, 0, 0, 0])
cash_pos = \
[event['cumulative_perf']['ending_cash'] for event in results]
self.assertEqual(cash_pos, [9000, 9000, 10000, 10000, 10000, 10000])
def test_long_position_receives_stock_dividend(self):
# post some trades in the market
events = {}
for asset in [self.asset1, self.asset2]:
events[asset.sid] = factory.create_trade_history(
asset,
[10, 10, 10, 10, 10, 10],
[100, 100, 100, 100, 100, 100],
oneday,
self.sim_params,
trading_calendar=self.trading_calendar,
)
dbpath = self.instance_tmpdir.getpath('adjustments.sqlite')
writer = SQLiteAdjustmentWriter(
dbpath,
MockDailyBarReader(),
self.trading_calendar.all_sessions
)
splits = mergers = create_empty_splits_mergers_frame()
dividends = pd.DataFrame({
'sid': np.array([], dtype=np.uint32),
'amount': np.array([], dtype=np.float64),
'declared_date': np.array([], dtype='datetime64[ns]'),
'ex_date': np.array([], dtype='datetime64[ns]'),
'pay_date': np.array([], dtype='datetime64[ns]'),
'record_date': np.array([], dtype='datetime64[ns]'),
})
sid_1 = events[1]
stock_dividends = pd.DataFrame({
'sid': np.array([1], dtype=np.uint32),
'payment_sid': np.array([2], dtype=np.uint32),
'ratio': np.array([2], dtype=np.float64),
'declared_date': np.array([sid_1[0].dt], dtype='datetime64[ns]'),
'ex_date': np.array([sid_1[1].dt], dtype='datetime64[ns]'),
'record_date': np.array([sid_1[1].dt], dtype='datetime64[ns]'),
'pay_date': np.array([sid_1[2].dt], dtype='datetime64[ns]'),
})
writer.write(splits, mergers, dividends, stock_dividends)
adjustment_reader = SQLiteAdjustmentReader(dbpath)
data_portal = create_data_portal_from_trade_history(
self.env.asset_finder,
self.trading_calendar,
self.instance_tmpdir,
self.sim_params,
events,
)
data_portal._adjustment_reader = adjustment_reader
txns = [create_txn(self.asset1, events[1][0].dt, 10.0, 100)]
results = calculate_results(
self.sim_params,
self.env,
data_portal,
txns=txns,
)
self.assertEqual(len(results), 6)
cumulative_returns = \
[event['cumulative_perf']['returns'] for event in results]
self.assertEqual(cumulative_returns, [0.0, 0.0, 0.2, 0.2, 0.2, 0.2])
daily_returns = [event['daily_perf']['returns']
for event in results]
self.assertEqual(daily_returns, [0.0, 0.0, 0.2, 0.0, 0.0, 0.0])
cash_flows = [event['daily_perf']['capital_used']
for event in results]
self.assertEqual(cash_flows, [-1000, 0, 0, 0, 0, 0])
cumulative_cash_flows = \
[event['cumulative_perf']['capital_used'] for event in results]
self.assertEqual(cumulative_cash_flows, [-1000] * 6)
cash_pos = \
[event['cumulative_perf']['ending_cash'] for event in results]
self.assertEqual(cash_pos, [9000] * 6)
def test_long_position_purchased_on_ex_date_receives_no_dividend(self):
# post some trades in the market
events = factory.create_trade_history(
self.asset1,
[10, 10, 10, 10, 10, 10],
[100, 100, 100, 100, 100, 100],
oneday,
self.sim_params,
trading_calendar=self.trading_calendar
)
dbpath = self.instance_tmpdir.getpath('adjustments.sqlite')
writer = SQLiteAdjustmentWriter(
dbpath,
MockDailyBarReader(),
self.trading_calendar.all_sessions
)
splits = mergers = create_empty_splits_mergers_frame()
dividends = pd.DataFrame({
'sid': np.array([1], dtype=np.uint32),
'amount': np.array([10.00], dtype=np.float64),
'declared_date': np.array([events[0].dt], dtype='datetime64[ns]'),
'ex_date': np.array([events[1].dt], dtype='datetime64[ns]'),
'record_date': np.array([events[1].dt], dtype='datetime64[ns]'),
'pay_date': np.array([events[2].dt], dtype='datetime64[ns]'),
})
writer.write(splits, mergers, dividends)
adjustment_reader = SQLiteAdjustmentReader(dbpath)
data_portal = create_data_portal_from_trade_history(
self.env.asset_finder,
self.trading_calendar,
self.instance_tmpdir,
self.sim_params,
{1: events},
)
data_portal._adjustment_reader = adjustment_reader
# Simulate a transaction being filled on the ex_date.
txns = [create_txn(self.asset1, events[1].dt, 10.0, 100)]
results = calculate_results(
self.sim_params,
self.env,
data_portal,
txns=txns,
)
self.assertEqual(len(results), 6)
cumulative_returns = \
[event['cumulative_perf']['returns'] for event in results]
self.assertEqual(cumulative_returns, [0, 0, 0, 0, 0, 0])
daily_returns = [event['daily_perf']['returns'] for event in results]
self.assertEqual(daily_returns, [0, 0, 0, 0, 0, 0])
cash_flows = [event['daily_perf']['capital_used'] for event in results]
self.assertEqual(cash_flows, [0, -1000, 0, 0, 0, 0])
cumulative_cash_flows = \
[event['cumulative_perf']['capital_used'] for event in results]
self.assertEqual(cumulative_cash_flows,
[0, -1000, -1000, -1000, -1000, -1000])
def test_selling_before_dividend_payment_still_gets_paid(self):
# post some trades in the market
events = factory.create_trade_history(
self.asset1,
[10, 10, 10, 10, 10, 10],
[100, 100, 100, 100, 100, 100],
oneday,
self.sim_params,
trading_calendar=self.trading_calendar,
)
dbpath = self.instance_tmpdir.getpath('adjustments.sqlite')
writer = SQLiteAdjustmentWriter(
dbpath,
MockDailyBarReader(),
self.trading_calendar.all_sessions,
)
splits = mergers = create_empty_splits_mergers_frame()
dividends = pd.DataFrame({
'sid': np.array([1], dtype=np.uint32),
'amount': np.array([10.00], dtype=np.float64),
'declared_date': np.array([events[0].dt], dtype='datetime64[ns]'),
'ex_date': np.array([events[1].dt], dtype='datetime64[ns]'),
'record_date': np.array([events[1].dt], dtype='datetime64[ns]'),
'pay_date': np.array([events[3].dt], dtype='datetime64[ns]'),
})
writer.write(splits, mergers, dividends)
adjustment_reader = SQLiteAdjustmentReader(dbpath)
data_portal = create_data_portal_from_trade_history(
self.env.asset_finder,
self.trading_calendar,
self.instance_tmpdir,
self.sim_params,
{1: events},
)
data_portal._adjustment_reader = adjustment_reader
buy_txn = create_txn(self.asset1, events[0].dt, 10.0, 100)
sell_txn = create_txn(self.asset1, events[2].dt, 10.0, -100)
txns = [buy_txn, sell_txn]
results = calculate_results(
self.sim_params,
self.env,
data_portal,
txns=txns,
)
self.assertEqual(len(results), 6)
cumulative_returns = \
[event['cumulative_perf']['returns'] for event in results]
self.assertEqual(cumulative_returns, [0, 0, 0, 0.1, 0.1, 0.1])
daily_returns = [event['daily_perf']['returns'] for event in results]
self.assertEqual(daily_returns, [0, 0, 0, 0.1, 0, 0])
cash_flows = [event['daily_perf']['capital_used'] for event in results]
self.assertEqual(cash_flows, [-1000, 0, 1000, 1000, 0, 0])
cumulative_cash_flows = \
[event['cumulative_perf']['capital_used'] for event in results]
self.assertEqual(cumulative_cash_flows,
[-1000, -1000, 0, 1000, 1000, 1000])
def test_buy_and_sell_before_ex(self):
# need a six-day simparam
# post some trades in the market
events = factory.create_trade_history(
self.asset1,
[10, 10, 10, 10, 10, 10],
[100, 100, 100, 100, 100, 100],
oneday,
self.sim_params,
trading_calendar=self.trading_calendar,
)
dbpath = self.instance_tmpdir.getpath('adjustments.sqlite')
writer = SQLiteAdjustmentWriter(
dbpath,
MockDailyBarReader(),
self.trading_calendar.all_sessions,
)
splits = mergers = create_empty_splits_mergers_frame()
dividends = pd.DataFrame({
'sid': np.array([1], dtype=np.uint32),
'amount': np.array([10.0], dtype=np.float64),
'declared_date': np.array([events[3].dt], dtype='datetime64[ns]'),
'ex_date': np.array([events[4].dt], dtype='datetime64[ns]'),
'pay_date': np.array([events[5].dt], dtype='datetime64[ns]'),
'record_date': np.array([events[4].dt], dtype='datetime64[ns]'),
})
writer.write(splits, mergers, dividends)
adjustment_reader = SQLiteAdjustmentReader(dbpath)
data_portal = create_data_portal_from_trade_history(
self.env.asset_finder,
self.trading_calendar,
self.instance_tmpdir,
self.sim_params,
{1: events},
)
data_portal._adjustment_reader = adjustment_reader
buy_txn = create_txn(self.asset1, events[1].dt, 10.0, 100)
sell_txn = create_txn(self.asset1, events[2].dt, 10.0, -100)
txns = [buy_txn, sell_txn]
results = calculate_results(
self.sim_params,
self.env,
data_portal,
txns=txns,
)
self.assertEqual(len(results), 6)
cumulative_returns = \
[event['cumulative_perf']['returns'] for event in results]
self.assertEqual(cumulative_returns, [0, 0, 0, 0, 0, 0])
daily_returns = [event['daily_perf']['returns'] for event in results]
self.assertEqual(daily_returns, [0, 0, 0, 0, 0, 0])
cash_flows = [event['daily_perf']['capital_used'] for event in results]
self.assertEqual(cash_flows, [0, -1000, 1000, 0, 0, 0])
cumulative_cash_flows = \
[event['cumulative_perf']['capital_used'] for event in results]
self.assertEqual(cumulative_cash_flows, [0, -1000, 0, 0, 0, 0])
def test_ending_before_pay_date(self):
# post some trades in the market
events = factory.create_trade_history(
self.asset1,
[10, 10, 10, 10, 10, 10],
[100, 100, 100, 100, 100, 100],
oneday,
self.sim_params,
trading_calendar=self.trading_calendar,
)
pay_date = self.sim_params.first_open
# find pay date that is much later.
for i in range(30):
pay_date = factory.get_next_trading_dt(pay_date, oneday,
self.trading_calendar)
dbpath = self.instance_tmpdir.getpath('adjustments.sqlite')
writer = SQLiteAdjustmentWriter(
dbpath,
MockDailyBarReader(),
self.trading_calendar.all_sessions,
)
splits = mergers = create_empty_splits_mergers_frame()
dividends = pd.DataFrame({
'sid': np.array([1], dtype=np.uint32),
'amount': np.array([10.00], dtype=np.float64),
'declared_date': np.array([events[0].dt], dtype='datetime64[ns]'),
'ex_date': np.array([events[0].dt], dtype='datetime64[ns]'),
'record_date': np.array([events[0].dt], dtype='datetime64[ns]'),
'pay_date': np.array([pay_date], dtype='datetime64[ns]'),
})
writer.write(splits, mergers, dividends)
adjustment_reader = SQLiteAdjustmentReader(dbpath)
data_portal = create_data_portal_from_trade_history(
self.env.asset_finder,
self.trading_calendar,
self.instance_tmpdir,
self.sim_params,
{1: events},
)
data_portal._adjustment_reader = adjustment_reader
txns = [create_txn(self.asset1, events[1].dt, 10.0, 100)]
results = calculate_results(
self.sim_params,
self.env,
data_portal,
txns=txns,
)
self.assertEqual(len(results), 6)
cumulative_returns = \
[event['cumulative_perf']['returns'] for event in results]
self.assertEqual(cumulative_returns, [0, 0, 0, 0.0, 0.0, 0.0])
daily_returns = [event['daily_perf']['returns'] for event in results]
self.assertEqual(daily_returns, [0, 0, 0, 0, 0, 0])
cash_flows = [event['daily_perf']['capital_used'] for event in results]
self.assertEqual(cash_flows, [0, -1000, 0, 0, 0, 0])
cumulative_cash_flows = \
[event['cumulative_perf']['capital_used'] for event in results]
self.assertEqual(
cumulative_cash_flows,
[0, -1000, -1000, -1000, -1000, -1000]
)
def test_short_position_pays_dividend(self):
# post some trades in the market
events = factory.create_trade_history(
self.asset1,
[10, 10, 10, 10, 10, 10],
[100, 100, 100, 100, 100, 100],
oneday,
self.sim_params,
trading_calendar=self.trading_calendar,
)
dbpath = self.instance_tmpdir.getpath('adjustments.sqlite')
writer = SQLiteAdjustmentWriter(
dbpath,
MockDailyBarReader(),
self.trading_calendar.all_sessions,
)
splits = mergers = create_empty_splits_mergers_frame()
dividends = pd.DataFrame({
'sid': np.array([1], dtype=np.uint32),
'amount': np.array([10.00], dtype=np.float64),
'declared_date': np.array([events[0].dt], dtype='datetime64[ns]'),
'ex_date': np.array([events[2].dt], dtype='datetime64[ns]'),
'record_date': np.array([events[2].dt], dtype='datetime64[ns]'),
'pay_date': np.array([events[3].dt], dtype='datetime64[ns]'),
})
writer.write(splits, mergers, dividends)
adjustment_reader = SQLiteAdjustmentReader(dbpath)
data_portal = create_data_portal_from_trade_history(
self.env.asset_finder,
self.trading_calendar,
self.instance_tmpdir,
self.sim_params,
{1: events},
)
data_portal._adjustment_reader = adjustment_reader
txns = [create_txn(self.asset1, events[1].dt, 10.0, -100)]
results = calculate_results(
self.sim_params,
self.env,
data_portal,
txns=txns,
)
self.assertEqual(len(results), 6)
cumulative_returns = \
[event['cumulative_perf']['returns'] for event in results]
self.assertEqual(cumulative_returns, [0.0, 0.0, 0.0, -0.1, -0.1, -0.1])
daily_returns = [event['daily_perf']['returns'] for event in results]
self.assertEqual(daily_returns, [0.0, 0.0, 0.0, -0.1, 0.0, 0.0])
cash_flows = [event['daily_perf']['capital_used'] for event in results]
self.assertEqual(cash_flows, [0, 1000, 0, -1000, 0, 0])
cumulative_cash_flows = \
[event['cumulative_perf']['capital_used'] for event in results]
self.assertEqual(cumulative_cash_flows, [0, 1000, 1000, 0, 0, 0])
def test_no_position_receives_no_dividend(self):
# post some trades in the market
events = factory.create_trade_history(
self.asset1,
[10, 10, 10, 10, 10, 10],
[100, 100, 100, 100, 100, 100],
oneday,
self.sim_params,
trading_calendar=self.trading_calendar,
)
dbpath = self.instance_tmpdir.getpath('adjustments.sqlite')
writer = SQLiteAdjustmentWriter(
dbpath,
MockDailyBarReader(),
self.trading_calendar.all_sessions,
)
splits = mergers = create_empty_splits_mergers_frame()
dividends = pd.DataFrame({
'sid': np.array([1], dtype=np.uint32),
'amount': np.array([10.00], dtype=np.float64),
'declared_date': np.array([events[0].dt], dtype='datetime64[ns]'),
'ex_date': np.array([events[1].dt], dtype='datetime64[ns]'),
'pay_date': np.array([events[2].dt], dtype='datetime64[ns]'),
'record_date': np.array([events[2].dt], dtype='datetime64[ns]'),
})
writer.write(splits, mergers, dividends)
adjustment_reader = SQLiteAdjustmentReader(dbpath)
data_portal = create_data_portal_from_trade_history(
self.env.asset_finder,
self.trading_calendar,
self.instance_tmpdir,
self.sim_params,
{1: events},
)
data_portal._adjustment_reader = adjustment_reader
results = calculate_results(
self.sim_params,
self.env,
data_portal,
)
self.assertEqual(len(results), 6)
cumulative_returns = \
[event['cumulative_perf']['returns'] for event in results]
self.assertEqual(cumulative_returns, [0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
daily_returns = [event['daily_perf']['returns'] for event in results]
self.assertEqual(daily_returns, [0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
cash_flows = [event['daily_perf']['capital_used'] for event in results]
self.assertEqual(cash_flows, [0, 0, 0, 0, 0, 0])
cumulative_cash_flows = \
[event['cumulative_perf']['capital_used'] for event in results]
self.assertEqual(cumulative_cash_flows, [0, 0, 0, 0, 0, 0])
def test_no_dividend_at_simulation_end(self):
# post some trades in the market
events = factory.create_trade_history(
self.asset1,
[10, 10, 10, 10, 10, 10],
[100, 100, 100, 100, 100, 100],
oneday,
self.sim_params,
trading_calendar=self.trading_calendar,
)
dbpath = self.instance_tmpdir.getpath('adjustments.sqlite')
writer = SQLiteAdjustmentWriter(
dbpath,
MockDailyBarReader(),
self.trading_calendar.all_sessions,
)
splits = mergers = create_empty_splits_mergers_frame()
dividends = pd.DataFrame({
'sid': np.array([1], dtype=np.uint32),
'amount': np.array([10.00], dtype=np.float64),
'declared_date': np.array([events[-3].dt], dtype='datetime64[ns]'),
'ex_date': np.array([events[-2].dt], dtype='datetime64[ns]'),
'record_date': np.array([events[0].dt], dtype='datetime64[ns]'),
'pay_date': np.array(
[self.trading_calendar.next_session_label(
self.trading_calendar.minute_to_session_label(
events[-1].dt
)
)],
dtype='datetime64[ns]'),
})
writer.write(splits, mergers, dividends)
adjustment_reader = SQLiteAdjustmentReader(dbpath)
# Set the last day to be the last event
sim_params = create_simulation_parameters(
num_days=6,
capital_base=10e3,
start=self.sim_params.start_session,
end=self.sim_params.end_session
)
sim_params = sim_params.create_new(
sim_params.start_session,
events[-1].dt
)
data_portal = create_data_portal_from_trade_history(
self.env.asset_finder,
self.trading_calendar,
self.instance_tmpdir,
sim_params,
{1: events},
)
data_portal._adjustment_reader = adjustment_reader
# Simulate a transaction being filled prior to the ex_date.
txns = [create_txn(self.asset1, events[0].dt, 10.0, 100)]
results = calculate_results(
sim_params,
self.env,
data_portal,
txns=txns,
)
self.assertEqual(len(results), 6)
cumulative_returns = \
[event['cumulative_perf']['returns'] for event in results]
self.assertEqual(cumulative_returns, [0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
daily_returns = [event['daily_perf']['returns'] for event in results]
self.assertEqual(daily_returns, [0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
cash_flows = [event['daily_perf']['capital_used'] for event in results]
self.assertEqual(cash_flows, [-1000, 0, 0, 0, 0, 0])
cumulative_cash_flows = \
[event['cumulative_perf']['capital_used'] for event in results]
self.assertEqual(cumulative_cash_flows,
[-1000, -1000, -1000, -1000, -1000, -1000])
class TestDividendPerformanceHolidayStyle(TestDividendPerformance):
# The holiday tests begins the simulation on the day
# before Thanksgiving, so that the next trading day is
# two days ahead. Any tests that hard code events
# to be start + oneday will fail, since those events will
# be skipped by the simulation.
START_DATE = pd.Timestamp('2003-11-30', tz='utc')
END_DATE = pd.Timestamp('2003-12-08', tz='utc')
class TestPositionPerformance(WithInstanceTmpDir,
WithTradingEnvironment,
ZiplineTestCase):
def create_environment_stuff(self,
num_days=4,
sids=[1, 2],
futures_sids=[3]):
start = pd.Timestamp('2006-01-01', tz='utc')
end = start + timedelta(days=num_days * 2)
equities = make_simple_equity_info(sids, start, end)
futures = pd.DataFrame.from_dict(
{
sid: {
'start_date': start,
'end_date': end,
'multiplier': 100,
'exchange': "TEST",
}
for sid in futures_sids
},
orient='index',
)
self.env = self.enter_instance_context(tmp_trading_env(
equities=equities,
futures=futures,
load=self.make_load_function(),
))
self.sim_params = create_simulation_parameters(
start=start,
num_days=num_days,
)
self.finder = self.env.asset_finder
self.asset1 = self.env.asset_finder.retrieve_asset(1)
self.asset2 = self.env.asset_finder.retrieve_asset(2)
self.asset3 = self.env.asset_finder.retrieve_asset(3)
def test_long_short_positions(self):
"""
start with $1000
buy 100 stock1 shares at $10
sell short 100 stock2 shares at $10
stock1 then goes down to $9
stock2 goes to $11
"""
self.create_environment_stuff()
trades_1 = factory.create_trade_history(
self.asset1,
[10, 10, 10, 9],
[100, 100, 100, 100],
oneday,
self.sim_params,
trading_calendar=self.trading_calendar,
)
trades_2 = factory.create_trade_history(
self.asset2,
[10, 10, 10, 11],
[100, 100, 100, 100],
oneday,
self.sim_params,
trading_calendar=self.trading_calendar,
)
data_portal = create_data_portal_from_trade_history(
self.env.asset_finder,
self.trading_calendar,
self.instance_tmpdir,
self.sim_params,
{1: trades_1, 2: trades_2}
)
txn1 = create_txn(self.asset1, trades_1[0].dt, 10.0, 100)
txn2 = create_txn(self.asset2, trades_1[0].dt, 10.0, -100)
pt = perf.PositionTracker(self.sim_params.data_frequency)
pp = perf.PerformancePeriod(1000.0, self.sim_params.data_frequency)
pp.position_tracker = pt
pt.execute_transaction(txn1)
pp.handle_execution(txn1)
pt.execute_transaction(txn2)
pp.handle_execution(txn2)
dt = trades_1[-2].dt
pt.sync_last_sale_prices(dt, False, data_portal)
pp.calculate_performance()
check_perf_period(
pp,
gross_leverage=2.0,
net_leverage=0.0,
long_exposure=1000.0,
longs_count=1,
short_exposure=-1000.0,
shorts_count=1)
# Validate that the account attributes were updated.
account = pp.as_account()
check_account(account,
settled_cash=1000.0,
equity_with_loan=1000.0,
total_positions_value=0.0,
total_positions_exposure=0.0,
regt_equity=1000.0,
available_funds=1000.0,
excess_liquidity=1000.0,
cushion=1.0,
leverage=2.0,
net_leverage=0.0,
net_liquidation=1000.0)
dt = trades_1[-1].dt
pt.sync_last_sale_prices(dt, False, data_portal)
pp.calculate_performance()
# Validate that the account attributes were updated.
account = pp.as_account()
check_perf_period(
pp,
gross_leverage=2.5,
net_leverage=-0.25,
long_exposure=900.0,
longs_count=1,
short_exposure=-1100.0,
shorts_count=1)
check_account(account,
settled_cash=1000.0,
equity_with_loan=800.0,
total_positions_value=-200.0,
total_positions_exposure=-200.0,
regt_equity=1000.0,
available_funds=1000.0,
excess_liquidity=1000.0,
cushion=1.25,
leverage=2.5,
net_leverage=-0.25,
net_liquidation=800.0)
def test_levered_long_position(self):
"""
start with $1,000, then buy 1000 shares at $10.
price goes to $11
"""
# post some trades in the market
self.create_environment_stuff()
trades = factory.create_trade_history(
self.asset1,
[10, 10, 10, 11],
[100, 100, 100, 100],
oneday,
self.sim_params,
trading_calendar=self.trading_calendar,
)
data_portal = create_data_portal_from_trade_history(
self.env.asset_finder,
self.trading_calendar,
self.instance_tmpdir,
self.sim_params,
{1: trades})
txn = create_txn(self.asset1, trades[1].dt, 10.0, 1000)
pt = perf.PositionTracker(self.sim_params.data_frequency)
pp = perf.PerformancePeriod(1000.0, self.sim_params.data_frequency)
pp.position_tracker = pt
pt.execute_transaction(txn)
pp.handle_execution(txn)
pp.calculate_performance()
check_perf_period(
pp,
gross_leverage=10.0,
net_leverage=10.0,
long_exposure=10000.0,
longs_count=1,
short_exposure=0.0,
shorts_count=0)
# Validate that the account attributes were updated.
pt.sync_last_sale_prices(trades[-2].dt, False, data_portal)
# Validate that the account attributes were updated.
account = pp.as_account()
check_account(account,
settled_cash=-9000.0,
equity_with_loan=1000.0,
total_positions_value=10000.0,
total_positions_exposure=10000.0,
regt_equity=-9000.0,
available_funds=-9000.0,
excess_liquidity=-9000.0,
cushion=-9.0,
leverage=10.0,
net_leverage=10.0,
net_liquidation=1000.0)
# now simulate a price jump to $11
pt.sync_last_sale_prices(trades[-1].dt, False, data_portal)
pp.calculate_performance()
check_perf_period(
pp,
gross_leverage=5.5,
net_leverage=5.5,
long_exposure=11000.0,
longs_count=1,
short_exposure=0.0,
shorts_count=0)
# Validate that the account attributes were updated.
account = pp.as_account()
check_account(account,
settled_cash=-9000.0,
equity_with_loan=2000.0,
total_positions_value=11000.0,
total_positions_exposure=11000.0,
regt_equity=-9000.0,
available_funds=-9000.0,
excess_liquidity=-9000.0,
cushion=-4.5,
leverage=5.5,
net_leverage=5.5,
net_liquidation=2000.0)
def test_long_position(self):
"""
verify that the performance period calculates properly for a
single buy transaction
"""
self.create_environment_stuff()
# post some trades in the market
trades = factory.create_trade_history(
self.asset1,
[10, 10, 10, 11],
[100, 100, 100, 100],
oneday,
self.sim_params,
trading_calendar=self.trading_calendar,
)
data_portal = create_data_portal_from_trade_history(
self.env.asset_finder,
self.trading_calendar,
self.instance_tmpdir,
self.sim_params,
{1: trades})
txn = create_txn(self.asset1, trades[1].dt, 10.0, 100)
pt = perf.PositionTracker(self.sim_params.data_frequency)
pp = perf.PerformancePeriod(1000.0,
self.sim_params.data_frequency,
period_open=self.sim_params.start_session,
period_close=self.sim_params.end_session)
pp.position_tracker = pt
pt.execute_transaction(txn)
pp.handle_execution(txn)
# This verifies that the last sale price is being correctly
# set in the positions. If this is not the case then returns can
# incorrectly show as sharply dipping if a transaction arrives
# before a trade. This is caused by returns being based on holding
# stocks with a last sale price of 0.
self.assertEqual(pp.positions[1].last_sale_price, 10.0)
pt.sync_last_sale_prices(trades[-1].dt, False, data_portal)
pp.calculate_performance()
self.assertEqual(
pp.cash_flow,
-1 * txn.price * txn.amount,
"capital used should be equal to the opposite of the transaction \
cost of sole txn in test"
)
self.assertEqual(
len(pp.positions),
1,
"should be just one position")
self.assertEqual(
pp.positions[1].asset,
txn.asset,
"position should be in security with id 1")
self.assertEqual(
pp.positions[1].amount,
txn.amount,
"should have a position of {sharecount} shares".format(
sharecount=txn.amount
)
)
self.assertEqual(
pp.positions[1].cost_basis,
txn.price,
"should have a cost basis of 10"
)
self.assertEqual(
pp.positions[1].last_sale_price,
trades[-1].price,
"last sale should be same as last trade. \
expected {exp} actual {act}".format(
exp=trades[-1].price,
act=pp.positions[1].last_sale_price)
)
self.assertEqual(
pp.ending_value,
1100,
"ending value should be price of last trade times number of \
shares in position"
)
self.assertEqual(pp.pnl, 100, "gain of 1 on 100 shares should be 100")
check_perf_period(
pp,
gross_leverage=1.0,
net_leverage=1.0,
long_exposure=1100.0,
longs_count=1,
short_exposure=0.0,
shorts_count=0)
# Validate that the account attributes were updated.
account = pp.as_account()
check_account(account,
settled_cash=0.0,
equity_with_loan=1100.0,
total_positions_value=1100.0,
total_positions_exposure=1100.0,
regt_equity=0.0,
available_funds=0.0,
excess_liquidity=0.0,
cushion=0.0,
leverage=1.0,
net_leverage=1.0,
net_liquidation=1100.0)
def test_short_position(self):
"""verify that the performance period calculates properly for a \
single short-sale transaction"""
self.create_environment_stuff(num_days=6)
trades = factory.create_trade_history(
self.asset1,
[10, 10, 10, 11, 10, 9],
[100, 100, 100, 100, 100, 100],
oneday,
self.sim_params,
trading_calendar=self.trading_calendar,
)
trades_1 = trades[:-2]
data_portal = create_data_portal_from_trade_history(
self.env.asset_finder,
self.trading_calendar,
self.instance_tmpdir,
self.sim_params,
{1: trades})
txn = create_txn(self.asset1, trades[1].dt, 10.0, -100)
pt = perf.PositionTracker(self.sim_params.data_frequency)
pp = perf.PerformancePeriod(1000.0, self.sim_params.data_frequency)
pp.position_tracker = pt
pt.execute_transaction(txn)
pp.handle_execution(txn)
pt.sync_last_sale_prices(trades_1[-1].dt, False, data_portal)
pp.calculate_performance()
self.assertEqual(
pp.cash_flow,
-1 * txn.price * txn.amount,
"capital used should be equal to the opposite of the transaction\
cost of sole txn in test"
)
self.assertEqual(
len(pp.positions),
1,
"should be just one position")
self.assertEqual(
pp.positions[1].asset,
txn.asset,
"position should be in security from the transaction"
)
self.assertEqual(
pp.positions[1].amount,
-100,
"should have a position of -100 shares"
)
self.assertEqual(
pp.positions[1].cost_basis,
txn.price,
"should have a cost basis of 10"
)
self.assertEqual(
pp.positions[1].last_sale_price,
trades_1[-1].price,
"last sale should be price of last trade"
)
self.assertEqual(
pp.ending_value,
-1100,
"ending value should be price of last trade times number of \
shares in position"
)
self.assertEqual(pp.pnl, -100, "gain of 1 on 100 shares should be 100")
# simulate additional trades, and ensure that the position value
# reflects the new price
trades_2 = trades[-2:]
# simulate a rollover to a new period
pp.rollover()
pt.sync_last_sale_prices(trades[-1].dt, False, data_portal)
pp.calculate_performance()
self.assertEqual(
pp.cash_flow,
0,
"capital used should be zero, there were no transactions in \
performance period"
)
self.assertEqual(
len(pp.positions),
1,
"should be just one position"
)
self.assertEqual(
pp.positions[1].asset,
txn.asset,
"position should be in security from the transaction"
)
self.assertEqual(
pp.positions[1].amount,
-100,
"should have a position of -100 shares"
)
self.assertEqual(
pp.positions[1].cost_basis,
txn.price,
"should have a cost basis of 10"
)
self.assertEqual(
pp.positions[1].last_sale_price,
trades_2[-1].price,
"last sale should be price of last trade"
)
self.assertEqual(
pp.ending_value,
-900,
"ending value should be price of last trade times number of \
shares in position")
self.assertEqual(
pp.pnl,
200,
"drop of 2 on -100 shares should be 200"
)
# now run a performance period encompassing the entire trade sample.
ptTotal = perf.PositionTracker(self.sim_params.data_frequency)
ppTotal = perf.PerformancePeriod(
1000.0, self.sim_params.data_frequency
)
ppTotal.position_tracker = pt
ptTotal.execute_transaction(txn)
ppTotal.handle_execution(txn)
ptTotal.sync_last_sale_prices(trades[-1].dt, False, data_portal)
ppTotal.calculate_performance()
self.assertEqual(
ppTotal.cash_flow,
-1 * txn.price * txn.amount,
"capital used should be equal to the opposite of the transaction \
cost of sole txn in test"
)
self.assertEqual(
len(ppTotal.positions),
1,
"should be just one position"
)
self.assertEqual(
ppTotal.positions[1].asset,
txn.asset,
"position should be in security from the transaction"
)
self.assertEqual(
ppTotal.positions[1].amount,
-100,
"should have a position of -100 shares"
)
self.assertEqual(
ppTotal.positions[1].cost_basis,
txn.price,
"should have a cost basis of 10"
)
self.assertEqual(
ppTotal.positions[1].last_sale_price,
trades_2[-1].price,
"last sale should be price of last trade"
)
self.assertEqual(
ppTotal.ending_value,
-900,
"ending value should be price of last trade times number of \
shares in position")
self.assertEqual(
ppTotal.pnl,
100,
"drop of 1 on -100 shares should be 100"
)
check_perf_period(
pp,
gross_leverage=0.8181,
net_leverage=-0.8181,
long_exposure=0.0,
longs_count=0,
short_exposure=-900.0,
shorts_count=1)
# Validate that the account attributes.
account = ppTotal.as_account()
check_account(account,
settled_cash=2000.0,
equity_with_loan=1100.0,
total_positions_value=-900.0,
total_positions_exposure=-900.0,
regt_equity=2000.0,
available_funds=2000.0,
excess_liquidity=2000.0,
cushion=1.8181,
leverage=0.8181,
net_leverage=-0.8181,
net_liquidation=1100.0)
def test_covering_short(self):
"""verify performance where short is bought and covered, and shares \
trade after cover"""
self.create_environment_stuff(num_days=10)
trades = factory.create_trade_history(
self.asset1,
[10, 10, 10, 11, 9, 8, 7, 8, 9, 10],
[100, 100, 100, 100, 100, 100, 100, 100, 100, 100],
oneday,
self.sim_params,
trading_calendar=self.trading_calendar,
)
data_portal = create_data_portal_from_trade_history(
self.env.asset_finder,
self.trading_calendar,
self.instance_tmpdir,
self.sim_params,
{1: trades})
short_txn = create_txn(self.asset1, trades[1].dt, 10.0, -100)
cover_txn = create_txn(self.asset1, trades[6].dt, 7.0, 100)
pt = perf.PositionTracker(self.sim_params.data_frequency)
pp = perf.PerformancePeriod(1000.0, self.sim_params.data_frequency)
pp.position_tracker = pt
pt.execute_transaction(short_txn)
pp.handle_execution(short_txn)
pt.execute_transaction(cover_txn)
pp.handle_execution(cover_txn)
pt.sync_last_sale_prices(trades[-1].dt, False, data_portal)
pp.calculate_performance()
short_txn_cost = short_txn.price * short_txn.amount
cover_txn_cost = cover_txn.price * cover_txn.amount
self.assertEqual(
pp.cash_flow,
-1 * short_txn_cost - cover_txn_cost,
"capital used should be equal to the net transaction costs"
)
self.assertEqual(
len(pp.positions),
0,
"should be zero positions"
)
self.assertEqual(
pp.ending_value,
0,
"ending value should be price of last trade times number of \
shares in position"
)
self.assertEqual(
pp.pnl,
300,
"gain of 1 on 100 shares should be 300"
)
check_perf_period(
pp,
gross_leverage=0.0,
net_leverage=0.0,
long_exposure=0.0,
longs_count=0,
short_exposure=0.0,
shorts_count=0)
account = pp.as_account()
check_account(account,
settled_cash=1300.0,
equity_with_loan=1300.0,
total_positions_value=0.0,
total_positions_exposure=0.0,
regt_equity=1300.0,
available_funds=1300.0,
excess_liquidity=1300.0,
cushion=1.0,
leverage=0.0,
net_leverage=0.0,
net_liquidation=1300.0)
def test_cost_basis_calc(self):
self.create_environment_stuff(num_days=5)
history_args = (
self.asset1,
[10, 11, 11, 12, 10],
[100, 100, 100, 100, 100],
oneday,
self.sim_params,
self.trading_calendar,
)
trades = factory.create_trade_history(*history_args)
transactions = factory.create_txn_history(*history_args)[:4]
data_portal = create_data_portal_from_trade_history(
self.env.asset_finder,
self.trading_calendar,
self.instance_tmpdir,
self.sim_params,
{1: trades})
pt = perf.PositionTracker(self.sim_params.data_frequency)
pp = perf.PerformancePeriod(
1000.0,
self.sim_params.data_frequency,
period_open=self.sim_params.start_session,
period_close=self.sim_params.sessions[-1]
)
pp.position_tracker = pt
average_cost = 0
for i, txn in enumerate(transactions):
pt.execute_transaction(txn)
pp.handle_execution(txn)
average_cost = (average_cost * i + txn.price) / (i + 1)
self.assertEqual(pt.positions[1].cost_basis, average_cost)
dt = trades[-2].dt
self.assertEqual(
pt.positions[1].last_sale_price,
trades[-2].price,
"should have a last sale of 12, got {val}".format(
val=pt.positions[1].last_sale_price)
)
self.assertEqual(
pt.positions[1].cost_basis,
11,
"should have a cost basis of 11"
)
pt.sync_last_sale_prices(dt, False, data_portal)
pp.calculate_performance()
self.assertEqual(
pp.pnl,
400
)
down_tick = trades[-1]
sale_txn = create_txn(self.asset1, down_tick.dt, 10.0, -100)
pp.rollover()
pt.execute_transaction(sale_txn)
pp.handle_execution(sale_txn)
dt = down_tick.dt
pt.sync_last_sale_prices(dt, False, data_portal)
pp.calculate_performance()
self.assertEqual(
pp.positions[1].last_sale_price,
10,
"should have a last sale of 10, was {val}".format(
val=pp.positions[1].last_sale_price)
)
self.assertEqual(
pp.positions[1].cost_basis,
11,
"should have a cost basis of 11"
)
self.assertEqual(pp.pnl, -800, "this period goes from +400 to -400")
pt3 = perf.PositionTracker(self.sim_params.data_frequency)
pp3 = perf.PerformancePeriod(1000.0, self.sim_params.data_frequency)
pp3.position_tracker = pt3
average_cost = 0
for i, txn in enumerate(transactions):
pt3.execute_transaction(txn)
pp3.handle_execution(txn)
average_cost = (average_cost * i + txn.price) / (i + 1)
self.assertEqual(pp3.positions[1].cost_basis, average_cost)
pt3.execute_transaction(sale_txn)
pp3.handle_execution(sale_txn)
trades.append(down_tick)
pt3.sync_last_sale_prices(trades[-1].dt, False, data_portal)
pp3.calculate_performance()
self.assertEqual(
pp3.positions[1].last_sale_price,
10,
"should have a last sale of 10"
)
self.assertEqual(
pp3.positions[1].cost_basis,
11,
"should have a cost basis of 11"
)
self.assertEqual(
pp3.pnl,
-400,
"should be -400 for all trades and transactions in period"
)
def test_cost_basis_calc_close_pos(self):
self.create_environment_stuff(num_days=8)
history_args = (
self.asset1,
[10, 9, 11, 8, 9, 12, 13, 14],
[200, -100, -100, 100, -300, 100, 500, 400],
oneday,
self.sim_params,
self.trading_calendar,
)
cost_bases = [10, 10, 0, 8, 9, 9, 13, 13.5]
transactions = factory.create_txn_history(*history_args)
pt = perf.PositionTracker(self.sim_params.data_frequency)
pp = perf.PerformancePeriod(1000.0, self.sim_params.data_frequency)
pp.position_tracker = pt
for idx, (txn, cb) in enumerate(zip(transactions, cost_bases)):
pt.execute_transaction(txn)
pp.handle_execution(txn)
if idx == 2:
# buy 200, sell 100, sell 100 = 0 shares = no position
self.assertNotIn(1, pp.positions)
else:
self.assertEqual(pp.positions[1].cost_basis, cb)
pp.calculate_performance()
self.assertEqual(pp.positions[1].cost_basis, cost_bases[-1])
def test_capital_change_intra_period(self):
self.create_environment_stuff()
# post some trades in the market
trades = factory.create_trade_history(
self.asset1,
[10.0, 11.0, 12.0, 13.0],
[100, 100, 100, 100],
oneday,
self.sim_params,
trading_calendar=self.trading_calendar,
)
data_portal = create_data_portal_from_trade_history(
self.env.asset_finder,
self.trading_calendar,
self.instance_tmpdir,
self.sim_params,
{1: trades})
txn = create_txn(self.asset1, trades[0].dt, 10.0, 100)
pt = perf.PositionTracker(self.sim_params.data_frequency)
pp = perf.PerformancePeriod(1000.0,
self.sim_params.data_frequency,
period_open=self.sim_params.start_session,
period_close=self.sim_params.end_session)
pp.position_tracker = pt
pt.execute_transaction(txn)
pp.handle_execution(txn)
# sync prices before we introduce a capital change
pt.sync_last_sale_prices(trades[2].dt, False, data_portal)
pp.initialize_subperiod_divider()
pp.set_current_subperiod_starting_values(1000.0)
pt.sync_last_sale_prices(trades[-1].dt, False, data_portal)
pp.calculate_performance()
self.assertAlmostEqual(pp.returns, 1200/1000 * 2300/2200 - 1)
self.assertAlmostEqual(pp.pnl, 300)
self.assertAlmostEqual(pp.cash_flow, -1000)
def test_capital_change_inter_period(self):
self.create_environment_stuff()
# post some trades in the market
trades = factory.create_trade_history(
self.asset1,
[10.0, 11.0, 12.0, 13.0],
[100, 100, 100, 100],
oneday,
self.sim_params,
trading_calendar=self.trading_calendar,
)
data_portal = create_data_portal_from_trade_history(
self.env.asset_finder,
self.trading_calendar,
self.instance_tmpdir,
self.sim_params,
{1: trades})
txn = create_txn(self.asset1, trades[0].dt, 10.0, 100)
pt = perf.PositionTracker(self.sim_params.data_frequency)
pp = perf.PerformancePeriod(1000.0,
self.sim_params.data_frequency,
period_open=self.sim_params.start_session,
period_close=self.sim_params.end_session)
pp.position_tracker = pt
pt.execute_transaction(txn)
pp.handle_execution(txn)
pt.sync_last_sale_prices(trades[0].dt, False, data_portal)
pp.calculate_performance()
self.assertAlmostEqual(pp.returns, 0)
self.assertAlmostEqual(pp.pnl, 0)
self.assertAlmostEqual(pp.cash_flow, -1000)
pp.rollover()
pt.sync_last_sale_prices(trades[1].dt, False, data_portal)
pp.calculate_performance()
self.assertAlmostEqual(pp.returns, 1100.0/1000.0 - 1)
self.assertAlmostEqual(pp.pnl, 100)
self.assertAlmostEqual(pp.cash_flow, 0)
pp.rollover()
pp.adjust_period_starting_capital(1000)
pt.sync_last_sale_prices(trades[2].dt, False, data_portal)
pp.calculate_performance()
self.assertAlmostEqual(pp.returns, 2200.0/2100.0 - 1)
self.assertAlmostEqual(pp.pnl, 100)
self.assertAlmostEqual(pp.cash_flow, 0)
pp.rollover()
pt.sync_last_sale_prices(trades[3].dt, False, data_portal)
pp.calculate_performance()
self.assertAlmostEqual(pp.returns, 2300.0/2200.0 - 1)
self.assertAlmostEqual(pp.pnl, 100)
self.assertAlmostEqual(pp.cash_flow, 0)
class TestPositionTracker(WithTradingEnvironment,
WithInstanceTmpDir,
ZiplineTestCase):
ASSET_FINDER_EQUITY_SIDS = 1, 2
@classmethod
def init_class_fixtures(cls):
super(TestPositionTracker, cls).init_class_fixtures()
cls.EQUITY1 = cls.asset_finder.retrieve_asset(1)
cls.EQUITY2 = cls.asset_finder.retrieve_asset(2)
cls.FUTURE3 = cls.asset_finder.retrieve_asset(3)
cls.FUTURE4 = cls.asset_finder.retrieve_asset(4)
cls.FUTURE5 = cls.asset_finder.retrieve_asset(1032201401)
@classmethod
def make_futures_info(cls):
return pd.DataFrame.from_dict(
{
3: {'multiplier': 1000, 'exchange': 'TEST'},
4: {'multiplier': 1000, 'exchange': 'TEST'},
1032201401: {'multiplier': 50, 'exchange': 'TEST'},
},
orient='index',
)
def test_empty_positions(self):
"""
make sure all the empty position stats return a numeric 0
Originally this bug was due to np.dot([], []) returning
np.bool_(False)
"""
sim_params = factory.create_simulation_parameters(num_days=4)
pt = perf.PositionTracker(sim_params.data_frequency)
pos_stats = pt.stats()
stats = [
'net_value',
'net_exposure',
'gross_value',
'gross_exposure',
'short_value',
'short_exposure',
'shorts_count',
'long_value',
'long_exposure',
'longs_count',
]
for name in stats:
val = getattr(pos_stats, name)
self.assertEquals(val, 0)
self.assertNotIsInstance(val, (bool, np.bool_))
def test_position_values_and_exposures(self):
pt = perf.PositionTracker(None)
dt = pd.Timestamp("1984/03/06 3:00PM")
pt.update_position(
self.EQUITY1, amount=np.float64(10.0),
last_sale_date=dt, last_sale_price=10
)
pt.update_position(
self.EQUITY2, amount=np.float64(-20.0),
last_sale_date=dt, last_sale_price=10
)
pt.update_position(
self.FUTURE3, amount=np.float64(30.0),
last_sale_date=dt, last_sale_price=10
)
pt.update_position(
self.FUTURE4, amount=np.float64(-40.0),
last_sale_date=dt, last_sale_price=10
)
# Test long-only methods
pos_stats = pt.stats()
self.assertEqual(100, pos_stats.long_value)
self.assertEqual(100 + 300000, pos_stats.long_exposure)
self.assertEqual(2, pos_stats.longs_count)
# Test short-only methods
self.assertEqual(-200, pos_stats.short_value)
self.assertEqual(-200 - 400000, pos_stats.short_exposure)
self.assertEqual(2, pos_stats.shorts_count)
# Test gross and net values
self.assertEqual(100 + 200, pos_stats.gross_value)
self.assertEqual(100 - 200, pos_stats.net_value)
# Test gross and net exposures
self.assertEqual(100 + 200 + 300000 + 400000, pos_stats.gross_exposure)
self.assertEqual(100 - 200 + 300000 - 400000, pos_stats.net_exposure)
def test_cost_basis(self):
dt = pd.Timestamp("2015-12-10 15:00", tz='UTC')
equity_pos = perf.Position(
self.EQUITY1,
amount=10,
last_sale_date=dt,
cost_basis=10,
last_sale_price=11,
)
future_pos = perf.Position(
self.FUTURE3,
amount=10,
last_sale_date=dt,
cost_basis=10,
last_sale_price=11,
)
self.assertEqual(10, equity_pos.cost_basis)
# send a $5 commission to the equity position. Spread out over 10
# shares, that bumps the cost basis by $0.50.
equity_pos.adjust_commission_cost_basis(self.EQUITY1, 5)
self.assertEqual(10.5, equity_pos.cost_basis)
self.assertEqual(10, future_pos.cost_basis)
# send a $5k commission to the futures position. since self.FUTURE3
# has a contract size (multipler) of 1000, this should result in a
# $10.5 updated cost basis. (5000 / 1000 = $5, spread out over 10
# contracts, is $0.50 extra per contract).
future_pos.adjust_commission_cost_basis(self.FUTURE3, 5000)
self.assertEqual(10.5, future_pos.cost_basis)
def test_update_positions(self):
pt = perf.PositionTracker(None)
dt = pd.Timestamp("2014/01/01 3:00PM")
# pos1 = perf.Position(self.EQUITY1, amount=np.float64(10.0),
# last_sale_date=dt, last_sale_price=10)
# pos2 = perf.Position(self.EQUITY2, amount=np.float64(-20.0),
# last_sale_date=dt, last_sale_price=10)
# pos3 = perf.Position(self.FUTURE5, amount=np.float64(30.0),
# last_sale_date=dt, last_sale_price=100)
pt.update_position(
self.EQUITY1,
amount=np.float64(10.0),
last_sale_price=10,
last_sale_date=dt
)
pt.update_position(
self.EQUITY2,
amount=np.float64(-20.0),
last_sale_price=10,
last_sale_date=dt
)
pt.update_position(
self.FUTURE5,
amount=np.float64(30.0),
last_sale_price=100,
last_sale_date=dt
)
pos_stats = pt.stats()
# Test long-only methods
self.assertEqual(100, pos_stats.long_value)
# 150,000 = 30 * 100 * 50 (amount * last_sale_price * multiplier)
self.assertEqual(100 + 150000, pos_stats.long_exposure)
self.assertEqual(2, pos_stats.longs_count)
# Test short-only methods
self.assertEqual(-200, pos_stats.short_value)
self.assertEqual(-200, pos_stats.short_exposure)
self.assertEqual(1, pos_stats.shorts_count)
# Test gross and net values
self.assertEqual(100 + 200, pos_stats.gross_value)
self.assertEqual(100 - 200, pos_stats.net_value)
# Test gross and net exposures
self.assertEqual(100 + 150000 + 200, pos_stats.gross_exposure)
self.assertEqual(100 + 150000 - 200, pos_stats.net_exposure)
def test_close_position(self):
pt = perf.PositionTracker(None)
dt = pd.Timestamp('2017/01/04 3:00PM')
pt.update_position(
asset=self.FUTURE5, amount=np.float64(30.0),
last_sale_date=dt, last_sale_price=100
)
pt.update_position(
asset=self.EQUITY1, amount=np.float64(10.0),
last_sale_date=dt, last_sale_price=10
)
txn = create_txn(self.FUTURE5, dt, 100, -30)
pt.execute_transaction(txn)
pos_stats = pt.stats()
# Test long-only methods.
self.assertEqual(100, pos_stats.long_value)
self.assertEqual(100, pos_stats.long_exposure)
self.assertEqual(1, pos_stats.longs_count)
# Test short-only methods.
self.assertEqual(0, pos_stats.short_value)
self.assertEqual(0, pos_stats.short_exposure)
self.assertEqual(0, pos_stats.shorts_count)
# Test gross and net values.
self.assertEqual(100, pos_stats.gross_value)
self.assertEqual(100, pos_stats.net_value)
# Test gross and net exposures.
self.assertEqual(100, pos_stats.gross_exposure)
self.assertEqual(100, pos_stats.net_exposure)