BLD: making some adjustment to the blotter to improve paper trading

This commit is contained in:
fredfortier
2017-12-05 22:08:30 -05:00
parent 96a27d083c
commit dcdf4f77db
9 changed files with 218 additions and 126 deletions
+3 -2
View File
@@ -256,7 +256,7 @@ if __name__ == '__main__':
initialize=initialize,
handle_data=handle_data,
analyze=analyze,
exchange_name='bitfinex',
exchange_name='bittrex',
algo_namespace=NAMESPACE,
base_currency='eth',
start=pd.to_datetime('2017-10-01', utc=True),
@@ -275,5 +275,6 @@ if __name__ == '__main__':
live=True,
algo_namespace=NAMESPACE,
base_currency='eth',
live_graph=False
live_graph=False,
simulate_orders=True
)
+36 -20
View File
@@ -3,6 +3,7 @@ from collections import defaultdict
import ccxt
import pandas as pd
import six
from ccxt import ExchangeNotAvailable
from six import string_types
@@ -19,7 +20,7 @@ from catalyst.exchange.exchange_errors import InvalidHistoryFrequencyError, \
ExchangeSymbolsNotFound, ExchangeRequestError, InvalidOrderStyle, \
ExchangeNotFoundError
from catalyst.exchange.exchange_utils import mixin_market_params, \
from_ms_timestamp
from_ms_timestamp, get_epoch
log = Logger('CCXT', level=LOG_LEVEL)
@@ -182,33 +183,48 @@ class CCXT(Exchange):
def get_candles(self, freq, assets, bar_count=None, start_dt=None,
end_dt=None):
is_single = (isinstance(assets, TradingPair))
if is_single:
assets = [assets]
symbols = self.get_symbols(assets)
timeframe = self.get_timeframe(freq)
delta = start_dt - pd.to_datetime('1970-1-1', utc=True)
delta = start_dt - get_epoch()
ms = int(delta.total_seconds()) * 1000
candles = dict()
for asset in assets:
ohlcvs = self.api.fetch_ohlcv(
symbol=symbols[0],
timeframe=timeframe,
since=ms,
limit=bar_count,
params={}
)
try:
ohlcvs = self.api.fetch_ohlcv(
symbol=symbols[0],
timeframe=timeframe,
since=ms,
limit=bar_count,
params={}
)
candles[asset] = []
for ohlcv in ohlcvs:
candles[asset].append(dict(
last_traded=pd.to_datetime(ohlcv[0], unit='ms', utc=True),
open=ohlcv[1],
high=ohlcv[2],
low=ohlcv[3],
close=ohlcv[4],
volume=ohlcv[5]
))
candles[asset] = []
for ohlcv in ohlcvs:
candles[asset].append(dict(
last_traded=pd.to_datetime(
ohlcv[0], unit='ms', utc=True
),
open=ohlcv[1],
high=ohlcv[2],
low=ohlcv[3],
close=ohlcv[4],
volume=ohlcv[5]
))
return candles
except Exception as e:
raise ExchangeRequestError(error=e)
if is_single:
return six.next(six.itervalues(candles))
else:
return candles
def _fetch_symbol_map(self, is_local):
try:
+4 -5
View File
@@ -324,7 +324,6 @@ class Exchange:
list[Transaction]
"""
transactions = list()
if self.portfolio.open_orders:
for order_id in list(self.portfolio.open_orders):
log.debug('found open order: {}'.format(order_id))
@@ -344,12 +343,13 @@ class Exchange:
order_id=order.id,
commission=order.commission
)
transactions.append(transaction)
yield order, transaction
self.portfolio.execute_order(order, transaction)
# self.portfolio.execute_order(order, transaction)
elif order.status == ORDER_STATUS.CANCELLED:
self.portfolio.remove_order(order)
# self.portfolio.remove_order(order)
yield order, None
else:
delta = pd.Timestamp.utcnow() - order.dt
@@ -359,7 +359,6 @@ class Exchange:
delta=delta
)
)
return transactions
def get_spot_value(self, assets, field, dt=None, data_frequency='minute'):
"""
+97 -95
View File
@@ -32,7 +32,7 @@ from catalyst.exchange.exchange_errors import (
ExchangeRequestError,
ExchangePortfolioDataError,
ExchangeTransactionError,
OrphanOrderError)
OrphanOrderError, OrderTypeNotSupported)
from catalyst.exchange.exchange_execution import ExchangeStopLimitOrder, \
ExchangeLimitOrder, ExchangeStopOrder
from catalyst.exchange.exchange_utils import save_algo_object, get_algo_object, \
@@ -63,9 +63,72 @@ class ExchangeAlgorithmExecutor(AlgorithmSimulator):
class ExchangeTradingAlgorithmBase(TradingAlgorithm):
def __init__(self, *args, **kwargs):
self.exchanges = kwargs.pop('exchanges', None)
self.simulate_orders = kwargs.pop('simulate_orders', None)
super(ExchangeTradingAlgorithmBase, self).__init__(*args, **kwargs)
if self.simulate_orders is None \
and self.sim_params.arena == 'backtest':
self.simulate_orders = True
self.blotter = ExchangeBlotter(
data_frequency=self.data_frequency,
# Default to NeverCancel in catalyst
cancel_policy=self.cancel_policy,
simulate_orders=self.simulate_orders
)
@staticmethod
def __convert_order_params_for_blotter(limit_price, stop_price, style):
"""
Helper method for converting deprecated limit_price and stop_price
arguments into ExecutionStyle instances.
This function assumes that either style == None or (limit_price,
stop_price) == (None, None).
"""
if stop_price:
raise OrderTypeNotSupported(order_type='stop')
if style:
if limit_price is not None:
raise ValueError(
'An order style and a limit price was included in the '
'order. Please pick one to avoid any possible conflict.'
)
# Currently limiting order types or limit and market to
# be in-line with CXXT and many exchanges. We'll consider
# adding more order types in the future.
if not isinstance(style, ExchangeLimitOrder) or \
not isinstance(style, MarketOrder):
raise OrderTypeNotSupported(
order_type=style.__class__.__name__
)
return style
if limit_price:
return ExchangeLimitOrder(limit_price)
else:
return MarketOrder()
def _calculate_order(self, asset, amount,
limit_price=None, stop_price=None, style=None):
# Raises a ZiplineError if invalid parameters are detected.
self.validate_order_params(asset,
amount,
limit_price,
stop_price,
style)
# Convert deprecated limit_price and stop_price parameters to use
# ExecutionStyle objects.
style = self.__convert_order_params_for_blotter(limit_price,
stop_price,
style)
return amount, style
def round_order(self, amount, asset):
"""
We need fractions with cryptocurrencies
@@ -204,50 +267,8 @@ class ExchangeTradingAlgorithmBacktest(ExchangeTradingAlgorithmBase):
super(ExchangeTradingAlgorithmBacktest, self).__init__(*args, **kwargs)
self.frame_stats = list()
self.blotter = ExchangeBlotter(
data_frequency=self.data_frequency,
# Default to NeverCancel in catalyst
cancel_policy=self.cancel_policy,
)
log.info('initialized trading algorithm in backtest mode')
def _calculate_order(self, asset, amount,
limit_price=None, stop_price=None, style=None):
# Raises a ZiplineError if invalid parameters are detected.
self.validate_order_params(asset,
amount,
limit_price,
stop_price,
style)
# Convert deprecated limit_price and stop_price parameters to use
# ExecutionStyle objects.
style = self.__convert_order_params_for_blotter(limit_price,
stop_price,
style)
return amount, style
@staticmethod
def __convert_order_params_for_blotter(limit_price, stop_price, style):
"""
Helper method for converting deprecated limit_price and stop_price
arguments into ExecutionStyle instances.
This function assumes that either style == None or (limit_price,
stop_price) == (None, None).
"""
if style:
assert (limit_price, stop_price) == (None, None)
return style
if limit_price and stop_price:
return ExchangeStopLimitOrder(limit_price, stop_price)
if limit_price:
return ExchangeLimitOrder(limit_price)
if stop_price:
return ExchangeStopOrder(stop_price)
else:
return MarketOrder()
def is_last_frame_of_day(self, data):
# TODO: adjust here to support more intervals
next_frame_dt = data.current_dt + timedelta(minutes=1)
@@ -289,7 +310,6 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
def __init__(self, *args, **kwargs):
self.algo_namespace = kwargs.pop('algo_namespace', None)
self.live_graph = kwargs.pop('live_graph', None)
self.simulate_orders = kwargs.pop('simulate_orders', None)
self._clock = None
self.frame_stats = deque(maxlen=60)
@@ -416,16 +436,6 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
return self.trading_client.transform()
def updated_portfolio(self):
"""
We skip the entire performance tracker business and update the
portfolio directly.
Returns
-------
ExchangePortfolio
"""
# TODO: build cumulative portfolio
return self.perf_tracker.get_portfolio(False)
def updated_account(self):
@@ -450,6 +460,7 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
last_sale_date=position.last_sale_date,
last_sale_price=position.last_sale_price
)
except ExchangeRequestError as e:
log.warn(
'update portfolio attempt {}: {}'.format(attempt_index, e)
@@ -464,30 +475,6 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
error=e
)
def _check_open_orders(self, attempt_index=0):
try:
orders = list()
for exchange_name in self.exchanges:
exchange = self.exchanges[exchange_name]
exchange_orders = exchange.check_open_orders()
orders += exchange_orders
return orders
except ExchangeRequestError as e:
log.warn(
'check open orders attempt {}: {}'.format(attempt_index, e)
)
if attempt_index < self.retry_check_open_orders:
sleep(self.retry_delay)
return self._check_open_orders(attempt_index + 1)
else:
raise ExchangePortfolioDataError(
data_type='order-status',
attempts=attempt_index,
error=e
)
def add_pnl_stats(self, period_stats):
"""
Save p&l stats.
@@ -577,14 +564,19 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
if not self.is_running:
return
self._synchronize_portfolio()
new_transactions, new_commissions, closed_orders = \
self.blotter.get_transactions(data)
transactions = self._check_open_orders()
if len(transactions) > 0:
for transaction in transactions:
self.perf_tracker.process_transaction(transaction)
self.blotter.prune_orders(closed_orders)
self.perf_tracker.update_performance()
for transaction in new_transactions:
self.perf_tracker.process_transaction(transaction)
# since this order was modified, record it
order = self.blotter.orders[transaction.order_id]
self.perf_tracker.process_order(order)
self.perf_tracker.update_performance()
if self._handle_data:
self._handle_data(self, data)
@@ -713,25 +705,35 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
order: Order
The catalyst order object or None
"""
amount, style = self._calculate_order(asset, amount,
limit_price, stop_price,
style)
order_id = self._order(asset, amount, limit_price, stop_price, style)
if self.simulate_orders:
order_id = super(ExchangeTradingAlgorithmLive, self).order(
asset, amount, limit_price, stop_price, style
)
log.debug('created a simulated order {}'.format(order_id))
else:
amount, style = self._calculate_order(
asset, amount, limit_price, stop_price, style
)
order_id = self._order(
asset, amount, limit_price, stop_price, style
)
exchange = self.exchanges[asset.exchange]
exchange_portfolio = exchange.portfolio
if order_id is not None:
current_order = None
for order in self.blotter.open_orders[asset]:
if current_order is None and order.id == order_id:
self.perf_tracker.process_order(order)
current_order = order
if order_id in exchange_portfolio.open_orders:
order = exchange_portfolio.open_orders[order_id]
self.perf_tracker.process_order(order)
return order
if current_order is not None:
return current_order
else:
raise OrphanOrderError(
order_id=order_id,
exchange=exchange.name
exchange=asset.exchange
)
else:
log.warn('unable to order {} {} on exchange {}'.format(
+46
View File
@@ -1,7 +1,11 @@
from time import sleep
from catalyst.assets._assets import TradingPair
from logbook import Logger
from catalyst.constants import LOG_LEVEL
from catalyst.exchange.exchange_errors import ExchangeRequestError, \
ExchangePortfolioDataError
from catalyst.finance.blotter import Blotter
from catalyst.finance.commission import CommissionModel
from catalyst.finance.slippage import SlippageModel
@@ -121,6 +125,8 @@ class TradingPairFixedSlippage(SlippageModel):
class ExchangeBlotter(Blotter):
def __init__(self, *args, **kwargs):
self.simulate_orders = kwargs.pop('simulate_orders', False)
super(ExchangeBlotter, self).__init__(*args, **kwargs)
# Using the equity models for now
@@ -132,3 +138,43 @@ class ExchangeBlotter(Blotter):
self.commission_models = {
TradingPair: TradingPairFeeSchedule()
}
def get_exchange_transactions(self, attempt_index=0):
closed_orders = []
transactions = []
commissions = []
try:
for exchange_name in self.exchanges:
exchange = self.exchanges[exchange_name]
for order, txn in exchange.check_open_orders():
order.dt = txn.dt
transactions.append(txn)
if not order.open:
closed_orders.append(order)
return transactions, commissions, closed_orders
except ExchangeRequestError as e:
log.warn(
'check open orders attempt {}: {}'.format(attempt_index, e)
)
if attempt_index < self.retry_check_open_orders:
sleep(self.retry_delay)
return self.get_exchange_transactions(attempt_index + 1)
else:
raise ExchangePortfolioDataError(
data_type='order-status',
attempts=attempt_index,
error=e
)
def get_transactions(self, bar_data):
if self.simulate_orders:
return super(ExchangeBlotter, self).get_transactions(bar_data)
else:
return self.get_exchange_transactions()
+7
View File
@@ -217,6 +217,7 @@ class PricingDataNotLoadedError(ZiplineError):
'{data_frequency} -i {symbol_list}`. See catalyst documentation '
'for details.').strip()
class PricingDataValueError(ZiplineError):
msg = ('Unable to retrieve pricing data for {exchange} {symbol} '
'[{start_dt} - {end_dt}]: {error}').strip()
@@ -244,3 +245,9 @@ class NoDataAvailableOnExchange(ZiplineError):
class NoValueForField(ZiplineError):
msg = ('Value not found for field: {field}.').strip()
class OrderTypeNotSupported(ZiplineError):
msg = (
'Order type `{order_type}` not currencly supported by Catalyst. '
'Please use `limit` or `market` orders only.').strip()
+20 -3
View File
@@ -40,7 +40,13 @@ class ExchangePortfolio(Portfolio):
"""
log.debug('creating order {}'.format(order.id))
self.open_orders[order.id] = order
open_orders = self.open_orders[order.asset] \
if order.asset is self.open_orders else []
open_orders.append(order)
self.open_orders[order.asset] = open_orders
order_position = self.positions[order.asset] \
if order.asset in self.positions else None
@@ -52,6 +58,17 @@ class ExchangePortfolio(Portfolio):
order_position.amount += order.amount
log.debug('open order added to portfolio')
def _remove_open_order(self, order):
try:
open_orders = self.open_orders[order.asset]
if order in open_orders:
open_orders.remove(order)
except Exception:
raise ValueError(
'unable to clear order not found in open order list.'
)
def execute_order(self, order, transaction):
"""
Update the open orders and positions to apply an executed order.
@@ -66,7 +83,7 @@ class ExchangePortfolio(Portfolio):
"""
log.debug('executing order {}'.format(order.id))
del self.open_orders[order.id]
self._remove_open_order(order)
order_position = self.positions[order.asset] \
if order.asset in self.positions else None
@@ -99,7 +116,7 @@ class ExchangePortfolio(Portfolio):
"""
log.info('removing cancelled order {}'.format(order.id))
del self.open_orders[order.id]
self._remove_open_order(order)
order_position = self.positions[order.asset] \
if order.asset in self.positions else None
+4
View File
@@ -627,3 +627,7 @@ def mixin_market_params(exchange_name, params, market):
def from_ms_timestamp(ms):
return pd.to_datetime(ms, unit='ms', utc=True)
def get_epoch():
return pd.to_datetime('1970-1-1', utc=True)
+1 -1
View File
@@ -81,5 +81,5 @@ empyrical==0.2.1
tables==3.3.0
#Catalyst dependencies
ccxt==1.10.251
ccxt==1.10.283