Removed asset backtester

This commit is contained in:
Juan Pablo Amoroso
2020-03-20 15:48:08 -03:00
parent 562e43bd63
commit 59def63d6a
14 changed files with 0 additions and 6949 deletions
-3
View File
@@ -1,3 +0,0 @@
from . import datahandler, charts
from .backtester import Backtest
from .portfolio import *
-121
View File
@@ -1,121 +0,0 @@
import pandas as pd
import pyprind
from .portfolio import Portfolio
class Backtest:
def __init__(self, schema, initial_capital=1_000_000):
self.schema = schema
self._portfolio = None
self._data = None
self.initial_capital = initial_capital
@property
def portfolio(self):
return self._portfolio
@portfolio.setter
def portfolio(self, portfolio):
assert isinstance(portfolio, Portfolio)
self._portfolio = portfolio
@property
def data(self):
return self._data
@data.setter
def data(self, data):
self._data = data
def run(self, periods=1, sma_days=None):
"""Runs a backtest and returns a dataframe with the daily balance"""
assert self._data is not None
assert self._portfolio is not None
self.current_capital = 0
self.current_cash = self.initial_capital
self.inventory = pd.DataFrame(columns=['symbol', 'cost', 'qty'])
self.balance = pd.DataFrame()
if sma_days:
self._data.sma(sma_days)
data_iterator = self._data.iter_dates()
first_day = self._data['date'].min()
last_day = self._data['date'].max()
rebalancing_days = pd.date_range(first_day, last_day, freq=str(periods) +
'BMS').to_pydatetime() if periods is not None else []
bar = pyprind.ProgBar(data_iterator.ngroups, bar_char='')
self.balance = pd.DataFrame({
'capital': self.current_cash,
'cash': self.current_cash
},
index=[self._data.start_date - pd.Timedelta(1, unit='day')])
for date, data in data_iterator:
if date == first_day:
self._rebalance_portfolio(data, sma_days)
self._update_balance(date, data)
if date in rebalancing_days:
self._rebalance_portfolio(data, sma_days)
bar.update()
self.balance['% change'] = self.balance['capital'].pct_change()
self.balance['accumulated return'] = (1.0 + self.balance['% change']).cumprod()
return self.balance
def _rebalance_portfolio(self, data, sma_days):
"""Rebalances the portfolio so that the total money is allocated according to the given percentages"""
money_total = self.current_cash + self.current_capital
for asset in self._portfolio.assets:
query = '{} == "{}"'.format(self.schema['symbol'], asset.symbol)
asset_current = data.query(query)
asset_price = asset_current[self.schema['adjClose']].values[0]
if sma_days is not None:
if asset_current['sma'].values[0] < asset_price:
qty = (money_total * asset.percentage) // asset_price
else:
qty = 0
else:
qty = (money_total * asset.percentage) // asset_price
inventory_entry = self.inventory.query(query)
self.inventory.drop(inventory_entry.index, inplace=True)
updated_asset = pd.Series([asset.symbol, asset_price, qty])
updated_asset.index = self.inventory.columns
self.inventory = self.inventory.append(updated_asset, ignore_index=True)
# Update current cash
invested_capital = sum(self.inventory['cost'] * self.inventory['qty'])
self.current_cash = money_total - invested_capital
def _update_balance(self, date, data):
"""Updates self.balance for the given date"""
costs = []
for asset in self._portfolio.assets:
query = '{} == "{}"'.format(self.schema['symbol'], asset.symbol)
asset_current = data.query(query)
inventory_asset = self.inventory.query(query)
cost = asset_current[self.schema['adjClose']].values[0]
qty = inventory_asset['qty'].values[0]
costs.append(cost * qty)
total_value = sum(costs)
self.current_capital = total_value
money_total = total_value + self.current_cash
row = pd.Series({
'total value': total_value,
'cash': self.current_cash,
'capital': money_total,
}, name=date)
self.balance = self.balance.append(row)
-78
View File
@@ -1,78 +0,0 @@
"""Generates charts from a portfolio report"""
import altair as alt
import pandas as pd
def returns_chart(report):
# Time interval selector
time_interval = alt.selection(type='interval', encodings=['x'])
# Area plot
areas = alt.Chart().mark_area(opacity=0.7).encode(x='index:T',
y=alt.Y('accumulated return:Q', axis=alt.Axis(format='%')))
# Nearest point selector
nearest = alt.selection(type='single', nearest=True, on='mouseover', fields=['index'], empty='none')
points = areas.mark_point().encode(opacity=alt.condition(nearest, alt.value(1), alt.value(0)))
# Transparent date selector
selectors = alt.Chart().mark_point().encode(
x='index:T',
opacity=alt.value(0),
).add_selection(nearest)
text = areas.mark_text(
align='left', dx=5,
dy=-5).encode(text=alt.condition(nearest, 'accumulated return:Q', alt.value(' '), format='.2%'))
layered = alt.layer(selectors,
points,
text,
areas.encode(
alt.X('index:T', axis=alt.Axis(title='date'), scale=alt.Scale(domain=time_interval))),
width=700,
height=350,
title='Wealth over time')
lower = areas.properties(width=700, height=70).add_selection(time_interval)
return alt.vconcat(layered, lower, data=report.reset_index())
def returns_histogram(report):
bar = alt.Chart(report).mark_bar().encode(x=alt.X('% change:Q',
bin=alt.BinParams(maxbins=100),
axis=alt.Axis(format='%')),
y='count():Q')
return bar
def monthly_returns_heatmap(report):
resample = report.resample('M')['capital'].last()
monthly_returns = resample.pct_change().reset_index()
monthly_returns['capital'].iat[0] = resample.iloc[0] / report.iloc[0]['capital'] - 1
monthly_returns.columns = ['date', 'capital']
chart = alt.Chart(monthly_returns).mark_rect().encode(
alt.X('year(date):O', title='Year'), alt.Y('month(date):O', title='Month'),
alt.Color('mean(capital)', title='Return', scale=alt.Scale(scheme='redyellowgreen')),
alt.Tooltip('mean(capital)', format='.2f')).properties(title='Monthly Returns')
return chart
def sma_graph(data):
price_chart = alt.Chart(
data,
width=700,
height=350,
).mark_line().encode(x='date:T', y=alt.Y('adjClose:Q'), color='symbol:N', opacity=alt.value(0.3))
sma_chart = alt.Chart(
data,
width=700,
height=350,
).mark_line(strokeDash=[1, 1]).encode(x='date:T', y=alt.Y('sma:Q'), color='symbol:N')
return price_chart + sma_chart
-2
View File
@@ -1,2 +0,0 @@
from .schema import *
from .historical_asset_data import HistoricalAssetData
@@ -1,79 +0,0 @@
import os
from .schema import Schema
import pandas as pd
class HistoricalAssetData:
"""Historical Asset Data container class."""
def __init__(self, file, schema=None, **params):
if schema:
assert isinstance(schema, Schema)
else:
self.schema = HistoricalAssetData.default_schema()
file_extension = os.path.splitext(file)[1]
if file_extension == '.h5':
self._data = pd.read_hdf(file, **params)
elif file_extension == '.csv':
params['parse_dates'] = [self.schema.date.mapping]
self._data = pd.read_csv(file, **params)
columns = self._data.columns
assert all((col in columns for _key, col in self.schema))
date_col = self.schema['date']
self.start_date = self._data[date_col].min()
self.end_date = self._data[date_col].max()
def apply_filter(self, f):
"""Apply Filter `f` to the data. Returns a `pd.DataFrame` with the filtered rows."""
return self._data.query(f.query)
def iter_dates(self):
"""Returns `pd.DataFrameGroupBy` that groups contracts by date"""
return self._data.groupby(self.schema['date'])
def __getattr__(self, attr):
"""Pass method invocation to `self._data`"""
method = getattr(self._data, attr)
if hasattr(method, '__call__'):
def df_method(*args, **kwargs):
return method(*args, **kwargs)
return df_method
else:
return method
def __getitem__(self, item):
if isinstance(item, pd.Series):
return self._data[item]
else:
key = self.schema[item]
return self._data[key]
def __setitem__(self, key, value):
self._data[key] = value
if key not in self.schema:
self.schema.update({key: key})
def __len__(self):
return len(self._data)
def __repr__(self):
return self._data.__repr__()
def default_schema():
"""Returns default schema for Historical Asset Data"""
schema = Schema.canonical()
return schema
def sma(self, months):
sma = self._data.groupby('symbol').rolling(months)['adjClose'].mean()
sma = sma.reset_index('symbol').sort_index()
sma = sma.fillna(0)
self._data['sma'] = sma['adjClose']
self.schema.update({'sma': 'sma'})
-162
View File
@@ -1,162 +0,0 @@
class Schema:
"""Data schema class.
Used to run validations and provide uniform access to fields in the data set.
"""
columns = [
"symbol", "date", "open", "close", "high", "low", "volume", "adjClose", "adjHigh", "adjLow", "adjOpen",
"adjVolume", "divCash", "splitFactor"
]
def canonical():
"""Builder method that returns a `Schema` with default mappings"""
mappings = {key: key for key in Schema.columns}
return Schema(mappings)
def __init__(self, mappings):
assert all((key in mappings for key in Schema.columns))
self._mappings = mappings
def update(self, mappings):
"""Update schema according to given `mappings`"""
self._mappings.update(mappings)
return self
def __contains__(self, key):
"""Returns True if key is in schema"""
return key in self._mappings.keys()
def __getattr__(self, key):
"""Returns Field object used to build Filters"""
return Field(key, self._mappings[key])
def __setitem__(self, key, value):
self._mappings[key] = value
def __getitem__(self, key):
"""Returns mapping of given `key`"""
return self._mappings[key]
def __iter__(self):
return iter(self._mappings.items())
def __repr__(self):
return "Schema({})".format([Field(k, m) for k, m in self._mappings.items()])
def __eq__(self, other):
return self._mappings == other._mappings
class Field:
"""Encapsulates data fields to build filters used by strategies"""
__slots__ = ("name", "mapping")
def __init__(self, name, mapping):
self.name = name
self.mapping = mapping
def _create_filter(self, op, other):
if isinstance(other, Field):
query = Field._format_query(self.mapping, op, other.mapping)
else:
query = Field._format_query(self.mapping, op, other)
return Filter(query)
def _combine_fields(self, op, other, invert=False):
if isinstance(other, Field):
name = Field._format_query(self.name, op, other.name, invert)
mapping = Field._format_query(self.mapping, op, other.mapping, invert)
elif isinstance(other, (int, float)):
name = Field._format_query(self.name, op, other, invert)
mapping = Field._format_query(self.mapping, op, other, invert)
else:
raise TypeError
return Field(name, mapping)
def _format_query(left, op, right, invert=False):
if invert:
left, right = right, left
query = "{left} {op} {right}".format(left=left, op=op, right=right)
return query
def __add__(self, value):
return self._combine_fields("+", value)
def __radd__(self, value):
return self._combine_fields("+", value, invert=True)
def __sub__(self, value):
return self._combine_fields("-", value)
def __rsub__(self, value):
return self._combine_fields("-", value, invert=True)
def __mul__(self, value):
return self._combine_fields("*", value)
def __rmul__(self, value):
return self._combine_fields("*", value, invert=True)
def __truediv__(self, value):
return self._combine_fields("/", value)
def __rtruediv__(self, value):
return self._combine_fields("/", value, invert=True)
def __lt__(self, value):
return self._create_filter("<", value)
def __le__(self, value):
return self._create_filter("<=", value)
def __gt__(self, value):
return self._create_filter(">", value)
def __ge__(self, value):
return self._create_filter(">=", value)
def __eq__(self, value):
if isinstance(value, str):
value = "'{}'".format(value)
return self._create_filter("==", value)
def __ne__(self, value):
return self._create_filter("!=", value)
def __repr__(self):
return "Field(name='{}', mapping='{}')".format(self.name, self.mapping)
class Filter:
"""This class determines entry/exit conditions for strategies"""
__slots__ = ("query")
def __init__(self, query):
self.query = query
def __and__(self, other):
"""Returns logical *and* between `self` and `other`"""
assert isinstance(other, Filter)
new_query = "({}) & ({})".format(self.query, other.query)
return Filter(query=new_query)
def __or__(self, other):
"""Returns logical *or* between `self` and `other`"""
assert isinstance(other, Filter)
new_query = "(({}) | ({}))".format(self.query, other.query)
return Filter(query=new_query)
def __invert__(self):
"""Negates filter"""
return Filter("!({})".format(self.query))
def __call__(self, data):
"""Returns dataframe of filtered data"""
return data.eval(self.query)
def __repr__(self):
return "Filter(query='{}')".format(self.query)
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
-2
View File
@@ -1,2 +0,0 @@
from .portfolio import Portfolio
from .asset import Asset
-8
View File
@@ -1,8 +0,0 @@
class Asset:
"""Asset data class"""
def __init__(self, symbol, percentage):
self.symbol = symbol
self.percentage = percentage
def __repr__(self):
return "Asset(symbol={}, percentage={})".format(self.symbol, self.percentage)
-28
View File
@@ -1,28 +0,0 @@
from .asset import Asset
class Portfolio:
def __init__(self):
self.assets = []
def add_asset(self, asset):
"""Adds asset to the Portfolio"""
assert isinstance(asset, Asset)
self.assets.append(asset)
return self
def add_assets(self, assets):
"""Adds assets to the Portfolio"""
for asset in assets:
self.add_asset(asset)
return self
def remove_asset(self, asset_number):
"""Removes asset from the Portfolio"""
self.assets.pop(asset_number)
return self
def clear_assets(self):
"""Removes *all* assets from the Portfolio"""
self.assets = []
return self
@@ -1,103 +0,0 @@
import numpy as np
from asset_backtester import Backtest, Portfolio, Asset
# We use Portfolio Visualizer (https://www.portfoliovisualizer.com/backtest-portfolio)
# to find the actual return for the test porfolios.
def test_ivy_portfolio(sample_datahandler):
bt = run_backtest(sample_datahandler, ivy_portfolio())
balance = bt.balance[1:]
tolerance = 0.0001
assert np.allclose(balance['capital'], balance['cash'] + balance['total value'], rtol=tolerance)
assert np.allclose(balance['capital'], bt.initial_capital * balance['accumulated return'], rtol=tolerance)
actual_return = 1.2041
return_tolerance = 0.01
assert np.isclose(balance['accumulated return'].iloc[-1], actual_return, rtol=return_tolerance)
def test_ivy_monthly_rebalance(sample_datahandler):
bt = run_backtest(sample_datahandler, ivy_portfolio(), periods=1)
balance = bt.balance[1:]
tolerance = 0.0001
assert np.allclose(balance['capital'], balance['cash'] + balance['total value'], rtol=tolerance)
assert np.allclose(balance['capital'], bt.initial_capital * balance['accumulated return'], rtol=tolerance)
actual_return = 1.2043
return_tolerance = 0.01
assert np.isclose(balance['accumulated return'].iloc[-1], actual_return, rtol=return_tolerance)
def test_all_weather_portfolio(sample_datahandler):
bt = run_backtest(sample_datahandler, all_weather_portfolio())
balance = bt.balance[1:]
tolerance = 0.0001
assert np.allclose(balance['capital'], balance['cash'] + balance['total value'], rtol=tolerance)
assert np.allclose(balance['capital'], bt.initial_capital * balance['accumulated return'], rtol=tolerance)
actual_return = 1.1874
return_tolerance = 0.01
assert np.isclose(balance['accumulated return'].iloc[-1], actual_return, rtol=return_tolerance)
def test_all_weather_monthly_rebalance(sample_datahandler):
bt = run_backtest(sample_datahandler, all_weather_portfolio(), periods=1)
balance = bt.balance[1:]
tolerance = 0.0001
assert np.allclose(balance['capital'], balance['cash'] + balance['total value'], rtol=tolerance)
assert np.allclose(balance['capital'], bt.initial_capital * balance['accumulated return'], rtol=tolerance)
actual_return = 1.1828
return_tolerance = 0.01
assert np.isclose(balance['accumulated return'].iloc[-1], actual_return, rtol=return_tolerance)
def test_constant_price(constant_price_datahandler):
bt = run_backtest(constant_price_datahandler, ivy_portfolio())
balance = bt.balance[1:]
tolerance = 0.0001
assert np.allclose(balance['% change'], 0.0, rtol=tolerance)
assert np.allclose(balance['capital'], bt.initial_capital, rtol=tolerance)
assert np.allclose(balance['total value'], bt.initial_capital, rtol=tolerance)
assert np.allclose(balance['accumulated return'], 1.0, rtol=tolerance)
def test_zero_initial_capital(sample_datahandler):
bt = run_backtest(sample_datahandler, ivy_portfolio(), initial_capital=0)
balance = bt.balance[1:]
tolerance = 0.0001
assert np.allclose(balance['capital'], balance['cash'] + balance['total value'], rtol=tolerance)
assert np.allclose(balance['cash'], 0.0, rtol=tolerance)
assert np.allclose(balance['total value'], 0.0, rtol=tolerance)
# Helpers
def run_backtest(data, portfolio, initial_capital=1_000_000, periods=None):
bt = Backtest(data.schema, initial_capital=initial_capital)
bt.portfolio = portfolio
bt.data = data
bt.run(periods=periods)
return bt
def ivy_portfolio():
portfolio = Portfolio()
assets = [Asset('VTI', 0.2), Asset('VEU', 0.2), Asset('BND', 0.2), Asset('VNQ', 0.2), Asset('DBC', 0.2)]
return portfolio.add_assets(assets)
def all_weather_portfolio():
portfolio = Portfolio()
assets = [Asset('VTI', 0.3), Asset('TLT', 0.4), Asset('IEF', 0.15), Asset('GLD', 0.075), Asset('DBC', 0.075)]
return portfolio.add_assets(assets)
-23
View File
@@ -1,23 +0,0 @@
import os
import pytest
from asset_backtester.datahandler import HistoricalAssetData
TEST_DIR = os.path.abspath(os.path.dirname(__file__))
# 2019 data for TLT, GLD, IEF, VTI, VEU, BND, VNQ and DBC
SAMPLE_DATA = os.path.join(TEST_DIR, 'test_data', 'sample_data.csv')
@pytest.fixture(scope='module')
def sample_datahandler():
data = HistoricalAssetData(SAMPLE_DATA)
return data
@pytest.fixture(scope='module')
def constant_price_datahandler():
data = HistoricalAssetData(SAMPLE_DATA)
data['adjClose'] = data['close'] = 10.0
return data
File diff suppressed because it is too large Load Diff