First working version with the backtest and live modes executing the same algorithm.

This commit is contained in:
fredfortier
2017-09-21 19:05:16 -04:00
parent 2f8768bb06
commit ddecd6bb48
7 changed files with 232 additions and 43 deletions
@@ -33,24 +33,25 @@ def initialize(context):
def _handle_data(context, data):
prices = data.history(
context.asset,
fields='price',
bar_count=20,
frequency='30m'
)
rsi = talib.RSI(prices.values, timeperiod=14)[-1]
log.info('got rsi: {}'.format(rsi))
# prices = data.history(
# context.asset,
# fields='price',
# bar_count=20,
# frequency='30m'
# )
# rsi = talib.RSI(prices.values, timeperiod=14)[-1]
# log.info('got rsi: {}'.format(rsi))
# Buying more when RSI is low, this should lower our cost basis
if rsi <= 30:
buy_increment = 1
elif rsi <= 40:
buy_increment = 0.5
elif rsi <= 70:
buy_increment = 0.1
else:
buy_increment = None
# if rsi <= 30:
# buy_increment = 1
# elif rsi <= 40:
# buy_increment = 0.5
# elif rsi <= 70:
# buy_increment = 0.1
# else:
# buy_increment = None
buy_increment = 0.1
cash = context.portfolio.cash
log.info('base currency available: {cash}'.format(cash=cash))
@@ -62,10 +63,10 @@ def _handle_data(context, data):
log.warn('no pricing data')
return
record(price=price, rsi=rsi)
record(price=price)
orders = get_open_orders(context.asset)
if orders:
if len(orders) > 0:
log.info('skipping bar until all open orders execute')
return
@@ -104,7 +105,6 @@ def _handle_data(context, data):
if is_buy:
if buy_increment is None:
log.info('the rsi is too high to consider buying {}'.format(rsi))
return
if price * buy_increment > cash:
@@ -117,11 +117,13 @@ def _handle_data(context, data):
cost_basis
)
)
limit_price = price * (1 + context.SLIPPAGE_ALLOWED)
order(
asset=context.asset,
amount=buy_increment,
limit_price=price * (1 + context.SLIPPAGE_ALLOWED)
limit_price=limit_price
)
pass
def handle_data(context, data):
+1 -1
View File
@@ -448,7 +448,7 @@ class Bitfinex(Exchange):
order_statuses['message'])
)
orders = list()
orders = []
for order_status in order_statuses:
order, executed_price = self._create_order(order_status)
if asset is None or asset == order.sid:
+27 -15
View File
@@ -15,13 +15,12 @@ import abc
import os
from time import sleep
import collections
import pandas as pd
from catalyst.assets._assets import TradingPair
from logbook import Logger
from catalyst.data.bundles.core import load, from_bundle_ingest_dirname, \
BundleData, minute_path, five_minute_path, daily_path
from catalyst.data.bundles.core import from_bundle_ingest_dirname, \
minute_path, five_minute_path, daily_path
from catalyst.data.data_portal import DataPortal
from catalyst.data.five_minute_bars import BcolzFiveMinuteBarReader
from catalyst.data.minute_bars import BcolzMinuteBarReader
@@ -200,6 +199,9 @@ class DataPortalExchangeBase(DataPortal):
)
def get_spot_value(self, assets, field, dt, data_frequency):
if field == 'price':
field = 'close'
return self._get_spot_value(assets, field, dt, data_frequency)
@abc.abstractmethod
@@ -263,20 +265,29 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase):
if time_folder is None:
raise BundleNotFoundError(exchange=exchange_name)
self.daily_bar_readers[exchange_name] = \
BcolzDailyBarReader(
daily_path(name, time_folder),
)
try:
self.daily_bar_readers[exchange_name] = \
BcolzDailyBarReader(
daily_path(name, time_folder),
)
except IOError:
self.daily_bar_readers[exchange_name] = None
self.five_minute_bar_readers[exchange_name] = \
BcolzFiveMinuteBarReader(
five_minute_path(name, time_folder),
)
try:
self.five_minute_bar_readers[exchange_name] = \
BcolzFiveMinuteBarReader(
five_minute_path(name, time_folder),
)
except IOError:
self.five_minute_bar_readers[exchange_name] = None
self.minute_bar_readers[exchange_name] = \
BcolzMinuteBarReader(
minute_path(name, time_folder),
)
try:
self.minute_bar_readers[exchange_name] = \
BcolzMinuteBarReader(
minute_path(name, time_folder),
)
except IOError:
self.minute_bar_readers[exchange_name] = None
@staticmethod
def find_most_recent_time(bundle_name):
@@ -309,6 +320,7 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase):
field,
data_frequency,
ffill=True):
# TODO: implement in the bundle
df = exchange.get_history_window(
assets,
end_dt,
+40
View File
@@ -35,12 +35,15 @@ from catalyst.exchange.exchange_errors import (
ExchangePortfolioDataError,
ExchangeTransactionError,
OrphanOrderError)
from catalyst.exchange.exchange_execution import ExchangeStopLimitOrder, \
ExchangeLimitOrder, ExchangeStopOrder
from catalyst.exchange.exchange_utils import get_exchange_minute_writer_root, \
save_algo_object, get_algo_object, get_algo_folder, get_algo_df, \
save_algo_df
from catalyst.exchange.live_graph_clock import LiveGraphClock
from catalyst.exchange.simple_clock import SimpleClock
from catalyst.exchange.stats_utils import get_pretty_stats
from catalyst.finance.execution import MarketOrder
from catalyst.finance.performance.period import calc_period_stats
from catalyst.gens.tradesimulation import AlgorithmSimulator
from catalyst.utils.api_support import (
@@ -198,6 +201,43 @@ class ExchangeTradingAlgorithmBacktest(ExchangeTradingAlgorithmBase):
)
log.info('initialized trading algorithm in backtest mode')
def _calculate_order(self, asset, amount,
limit_price=None, stop_price=None, style=None):
# Raises a ZiplineError if invalid parameters are detected.
self.validate_order_params(asset,
amount,
limit_price,
stop_price,
style)
# Convert deprecated limit_price and stop_price parameters to use
# ExecutionStyle objects.
style = self.__convert_order_params_for_blotter(limit_price,
stop_price,
style)
return amount, style
@staticmethod
def __convert_order_params_for_blotter(limit_price, stop_price, style):
"""
Helper method for converting deprecated limit_price and stop_price
arguments into ExecutionStyle instances.
This function assumes that either style == None or (limit_price,
stop_price) == (None, None).
"""
if style:
assert (limit_price, stop_price) == (None, None)
return style
if limit_price and stop_price:
return ExchangeStopLimitOrder(limit_price, stop_price)
if limit_price:
return ExchangeLimitOrder(limit_price)
if stop_price:
return ExchangeStopOrder(stop_price)
else:
return MarketOrder()
class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
def __init__(self, *args, **kwargs):
+137 -4
View File
@@ -1,8 +1,141 @@
from logbook import Logger
from catalyst.finance.blotter import Blotter
from catalyst.finance.commission import PerShare
from catalyst.finance.slippage import VolumeShareSlippage
from catalyst.finance.commission import PerShare, CommissionModel
from catalyst.finance.slippage import VolumeShareSlippage, SlippageModel, \
LiquidityExceeded
from catalyst.assets._assets import TradingPair
# It seems like we need to accept greate slippage risk in cryptos
# Orders won't often close at Equity levels.
# TODO: consider adjusting dynamically based on trading pair
from catalyst.finance.transaction import Transaction
log = Logger('exchange_blotter')
DEFAULT_SLIPPAGE_SPREAD = 0.02
DEFAULT_MAKER_FEE = 0.001
DEFAULT_TAKER_FEE = 0.002
class TradingPairFeeSchedule(CommissionModel):
"""
Calculates a commission for a transaction based on a per percentage fee.
Parameters
----------
fee : float, optional
The percentage fee.
"""
def __init__(self,
maker_fee=DEFAULT_MAKER_FEE,
taker_fee=DEFAULT_TAKER_FEE):
self.maker_fee = maker_fee
self.taker_fee = taker_fee
def __repr__(self):
return (
'{class_name}(maker_fee={maker_fee}, '
'taker_fee={taker_fee})'
.format(
class_name=self.__class__.__name__,
maker_fee=self.maker_fee,
taker_fee=self.taker_fee,
)
)
def calculate(self, order, transaction):
"""
Calculate the final fee based on the order parameters.
:param order:
:param transaction:
:return float:
The total commission.
"""
cost = abs(transaction.amount) * transaction.price
# Assuming just the taker fee for now
fee = cost * self.taker_fee
return fee
class TradingPairFixedSlippage(SlippageModel):
"""
Model slippage as a fixed spread.
Parameters
----------
spread : float, optional
spread / 2 will be added to buys and subtracted from sells.
"""
def __init__(self, spread=DEFAULT_SLIPPAGE_SPREAD):
super(TradingPairFixedSlippage, self).__init__()
self.spread = spread
def __repr__(self):
return '{class_name}(spread={spread})'.format(
class_name=self.__class__.__name__, spread=self.spread,
)
def simulate(self, data, asset, orders_for_asset):
self._volume_for_bar = 0
volume = data.current(asset, "volume")
if volume == 0:
return
# can use the close price, since we verified there's volume in this
# bar.
price = data.current(asset, "close")
dt = data.current_dt
for order in orders_for_asset:
if order.open_amount == 0:
continue
order.check_triggers(price, dt)
if not order.triggered:
continue
transaction = None
try:
execution_price, execution_volume = \
self.process_order(data, order)
if execution_price is not None:
transaction = Transaction(
asset=order.asset,
amount=abs(execution_volume),
dt=data.current_dt,
price=execution_price,
order_id=order.id
)
except LiquidityExceeded:
break
if transaction:
self._volume_for_bar += abs(transaction.amount)
yield order, transaction
def process_order(self, data, order):
price = data.current(order.asset, 'close')
if order.amount > 0:
# Buy order
adj_price = price * (1 + self.spread)
else:
# Sell order
adj_price = price & (1 - self.spread)
log.debug('added slippage to price: {} => {}'.format(price, adj_price))
return (adj_price, order.amount)
class ExchangeBlotter(Blotter):
def __init__(self, *args, **kwargs):
@@ -12,8 +145,8 @@ class ExchangeBlotter(Blotter):
# We may be able to define more sophisticated models based on the fee
# structure of each exchange.
self.slippage_models = {
TradingPair: VolumeShareSlippage()
TradingPair: TradingPairFixedSlippage()
}
self.commission_models = {
TradingPair: PerShare()
TradingPair: TradingPairFeeSchedule()
}
+1
View File
@@ -64,6 +64,7 @@ def process_bar_data(exchange, assets, writer, data_frequency,
show_progress, start, end):
open_calendar = get_calendar('OPEN')
writer.default_ohlc_ratio = 1000000
writer.calendar = open_calendar
writer.minutes_per_day = 1440
writer.write_metadata = True
+4 -3
View File
@@ -16,12 +16,12 @@ class ExchangeBundleTestCase:
def test_ingest(self):
exchange_name = 'bitfinex'
start = pd.Timestamp.utcnow() - timedelta(days=365)
start = pd.to_datetime('2017-09-01', utc=True)
end = pd.Timestamp.utcnow()
open_calendar = get_calendar('OPEN')
root = data_root(os.environ)
output_dir = '{root}/exchange_{exchange}/test'.format(
output_dir = '{root}/exchange_{exchange}/2017-09-21T05;34;37.274482'.format(
root=root,
exchange=exchange_name
)
@@ -40,7 +40,8 @@ class ExchangeBundleTestCase:
minutes_per_day=1440,
start_session=start_session,
end_session=end,
write_metadata=True
write_metadata=True,
default_ohlc_ratio=1000000
)
ingest = exchange_bundle(