Merge branch 'develop' of github.com:enigmampc/catalyst into develop

This commit is contained in:
Victor Grau Serrat
2018-01-13 06:17:02 -07:00
14 changed files with 319 additions and 102 deletions
+1
View File
@@ -40,6 +40,7 @@ develop-eggs
coverage.xml
htmlcov
nosetests.xml
.python-version
# C Extensions
*.o
+3 -2
View File
@@ -248,7 +248,7 @@ if __name__ == '__main__':
if live:
run_algorithm(
capital_base=0.1,
capital_base=0.01,
initialize=initialize,
handle_data=handle_data,
analyze=analyze,
@@ -257,8 +257,9 @@ if __name__ == '__main__':
algo_namespace=NAMESPACE,
base_currency='btc',
live_graph=False,
simulate_orders=True,
simulate_orders=False,
stats_output=None,
# auth_aliases=dict(poloniex='auth2')
)
else:
+189 -49
View File
@@ -6,6 +6,11 @@ from collections import defaultdict
import ccxt
import pandas as pd
import six
from ccxt import InvalidOrder, NetworkError, \
ExchangeError
from logbook import Logger
from six import string_types
from catalyst.algorithm import MarketOrder
from catalyst.assets._assets import TradingPair
from catalyst.constants import LOG_LEVEL
@@ -13,16 +18,14 @@ from catalyst.exchange.exchange import Exchange
from catalyst.exchange.exchange_bundle import ExchangeBundle
from catalyst.exchange.exchange_errors import InvalidHistoryFrequencyError, \
ExchangeSymbolsNotFound, ExchangeRequestError, InvalidOrderStyle, \
ExchangeNotFoundError, CreateOrderError, InvalidHistoryTimeframeError
ExchangeNotFoundError, CreateOrderError, InvalidHistoryTimeframeError, \
UnsupportedHistoryFrequencyError
from catalyst.exchange.exchange_execution import ExchangeLimitOrder
from catalyst.exchange.utils.exchange_utils import mixin_market_params, \
from_ms_timestamp, get_epoch, get_exchange_folder, get_catalyst_symbol, \
get_exchange_auth
from catalyst.finance.order import Order, ORDER_STATUS
from ccxt import InvalidOrder, NetworkError, \
ExchangeError
from logbook import Logger
from six import string_types
from catalyst.finance.transaction import Transaction
log = Logger('CCXT', level=LOG_LEVEL)
@@ -376,6 +379,14 @@ class CCXT(Exchange):
symbols = self.get_symbols(assets)
timeframe = CCXT.get_timeframe(freq)
if timeframe not in self.api.timeframes:
freqs = [CCXT.get_frequency(t) for t in self.api.timeframes]
raise UnsupportedHistoryFrequencyError(
exchange=self.name,
freq=freq,
freqs=freqs,
)
ms = None
if start_dt is not None:
delta = start_dt - get_epoch()
@@ -705,6 +716,12 @@ class CCXT(Exchange):
else:
adj_amount = abs(amount)
if adj_amount == 0:
raise CreateOrderError(
exchange=self.name,
e='order amount lower than the smallest lot: {}'.format(amount)
)
try:
result = self.api.create_order(
symbol=symbol,
@@ -759,13 +776,111 @@ class CCXT(Exchange):
orders = []
for order_status in result:
order, executed_price = self._create_order(order_status)
order, _ = self._create_order(order_status)
if asset is None or asset == order.sid:
orders.append(order)
return orders
def get_order(self, order_id, asset_or_symbol=None):
def _process_order_fallback(self, order):
"""
Fallback method for exchanges which do not play nice with
fetch-my-trades. Apparently, about 60% of exchanges will return
the correct executed values with this method. Others will support
fetch-my-trades.
Parameters
----------
order: Order
Returns
-------
float
"""
exc_order, price = self.get_order(
order.id, order.asset, return_price=True
)
order.status = exc_order.status
order.commission = exc_order.commission
if order.amount != exc_order.amount:
log.warn(
'executed order amount {} differs '
'from original'.format(
exc_order.amount, order.amount
)
)
order.amount = exc_order.amount
if order.status == ORDER_STATUS.FILLED:
transaction = Transaction(
asset=order.asset,
amount=order.amount,
dt=pd.Timestamp.utcnow(),
price=price,
order_id=order.id,
commission=order.commission
)
return [transaction]
def process_order(self, order):
# TODO: move to parent class after tracking features in the parent
if not self.api.hasFetchMyTrades:
return self._process_order_fallback(order)
try:
all_trades = self.get_trades(order.asset)
except ExchangeRequestError as e:
log.warn(
'unable to fetch account trades, trying an alternate '
'method to find executed order {} / {}: {}'.format(
order.id, order.asset.symbol, e
)
)
return self._process_order_fallback(order)
transactions = []
trades = [t for t in all_trades if t['order'] == order.id]
if not trades:
log.debug(
'order {} / {} not found in trades'.format(
order.id, order.asset.symbol
)
)
return transactions
trades.sort(key=lambda t: t['timestamp'], reverse=False)
order.filled = 0
order.commission = 0
for trade in trades:
# status property will update automatically
filled = trade['amount'] * order.direction
order.filled += filled
commission = 0
if 'fee' in trade and 'cost' in trade['fee']:
commission = trade['fee']['cost']
order.commission += commission
order.check_triggers(
price=trade['price'],
dt=pd.to_datetime(trade['timestamp'], unit='ms', utc=True),
)
transaction = Transaction(
asset=order.asset,
amount=filled,
dt=pd.Timestamp.utcnow(),
price=trade['price'],
order_id=order.id,
commission=commission
)
transactions.append(transaction)
order.broker_order_id = ', '.join([t['id'] for t in trades])
return transactions
def get_order(self, order_id, asset_or_symbol=None, return_price=False):
if asset_or_symbol is None:
log.debug(
'order not found in memory, the request might fail '
@@ -777,6 +892,12 @@ class CCXT(Exchange):
order_status = self.api.fetch_order(id=order_id, symbol=symbol)
order, executed_price = self._create_order(order_status)
if return_price:
return order, executed_price
else:
return order
except (ExchangeError, NetworkError) as e:
log.warn(
'unable to fetch order {} / {}: {}'.format(
@@ -785,8 +906,6 @@ class CCXT(Exchange):
)
raise ExchangeRequestError(error=e)
return order, executed_price
def cancel_order(self, order_param, asset_or_symbol=None):
order_id = order_param.id \
if isinstance(order_param, Order) else order_param
@@ -822,48 +941,45 @@ class CCXT(Exchange):
list[dict[str, float]
"""
tickers = dict()
try:
for asset in assets:
symbol = self.get_symbol(asset)
# TODO: use fetch_tickers() for efficiency
# I tried using fetch_tickers() but noticed some
# inconsistencies, see issue:
# https://github.com/ccxt/ccxt/issues/870
tickers = {}
for asset in assets:
symbol = self.get_symbol(asset)
self.ask_request()
# TODO: use fetch_tickers() for efficiency
# I tried using fetch_tickers() but noticed some
# inconsistencies, see issue:
# https://github.com/ccxt/ccxt/issues/870
try:
ticker = self.api.fetch_ticker(symbol=symbol)
if not ticker:
log.warn('ticker not found for {} {}'.format(
self.name, symbol
))
continue
ticker['last_traded'] = from_ms_timestamp(ticker['timestamp'])
if 'last_price' not in ticker:
# TODO: any more exceptions?
ticker['last_price'] = ticker['last']
if 'baseVolume' in ticker and ticker['baseVolume'] is not None:
# Using the volume represented in the base currency
ticker['volume'] = ticker['baseVolume']
elif 'info' in ticker and 'bidQty' in ticker['info'] \
and 'askQty' in ticker['info']:
ticker['volume'] = float(ticker['info']['bidQty']) + \
float(ticker['info']['askQty'])
else:
ticker['volume'] = 0
tickers[asset] = ticker
except (ExchangeError, NetworkError) as e:
log.warn(
'unable to fetch ticker {} / {}: {}'.format(
self.name, asset.symbol, e
except (ExchangeError, NetworkError) as e:
log.warn(
'unable to fetch ticker {} / {}: {}'.format(
self.name, asset.symbol, e
)
)
)
raise ExchangeRequestError(error=e)
continue
ticker['last_traded'] = from_ms_timestamp(ticker['timestamp'])
if 'last_price' not in ticker:
# TODO: any more exceptions?
ticker['last_price'] = ticker['last']
if 'baseVolume' in ticker and ticker['baseVolume'] is not None:
# Using the volume represented in the base currency
ticker['volume'] = ticker['baseVolume']
elif 'info' in ticker and 'bidQty' in ticker['info'] \
and 'askQty' in ticker['info']:
ticker['volume'] = float(ticker['info']['bidQty']) + \
float(ticker['info']['askQty'])
else:
ticker['volume'] = 0
tickers[asset] = ticker
return tickers
@@ -893,3 +1009,27 @@ class CCXT(Exchange):
))
return result
def get_trades(self, asset, my_trades=True, start_dt=None, limit=None):
if not my_trades:
raise NotImplemented(
'get_trades only supports "my trades"'
)
# TODO: is it possible to sort this? Limit is useless otherwise.
ccxt_symbol = self.get_symbol(asset)
try:
trades = self.api.fetch_my_trades(
symbol=ccxt_symbol,
since=start_dt,
limit=limit,
)
except (ExchangeError, NetworkError) as e:
log.warn(
'unable to fetch trades {} / {}: {}'.format(
self.name, asset.symbol, e
)
)
raise ExchangeRequestError(error=e)
return trades
+34 -1
View File
@@ -899,6 +899,22 @@ class Exchange:
"""
pass
@abstractmethod
def process_order(self, order):
"""
Similar to get_order but looks only for executed orders.
Parameters
----------
order: Order
Returns
-------
float
Avg execution price
"""
@abstractmethod
def cancel_order(self, order_param, symbol_or_asset=None):
"""Cancel an open order.
@@ -979,7 +995,7 @@ class Exchange:
@abc.abstractmethod
def get_orderbook(self, asset, order_type, limit):
"""
Retrieve the the orderbook for the given trading pair.
Retrieve the orderbook for the given trading pair.
Parameters
----------
@@ -993,3 +1009,20 @@ class Exchange:
list[dict[str, float]
"""
pass
@abc.abstractmethod
def get_trades(self, asset, my_trades, start_dt, limit):
"""
Retrieve a list of trades.
Parameters
----------
my_trades: bool
List only my trades.
start_dt
limit
Returns
-------
"""
+30 -36
View File
@@ -1,4 +1,8 @@
import numpy as np
import pandas as pd
from logbook import Logger
from redo import retry
from catalyst.assets._assets import TradingPair
from catalyst.constants import LOG_LEVEL
from catalyst.exchange.exchange_errors import ExchangeRequestError
@@ -8,8 +12,6 @@ from catalyst.finance.order import ORDER_STATUS
from catalyst.finance.slippage import SlippageModel
from catalyst.finance.transaction import create_transaction, Transaction
from catalyst.utils.input_validation import expect_types
from logbook import Logger
from redo import retry
log = Logger('exchange_blotter', level=LOG_LEVEL)
@@ -93,7 +95,6 @@ class TradingPairFixedSlippage(SlippageModel):
def simulate(self, data, asset, orders_for_asset):
self._volume_for_bar = 0
price = data.current(asset, 'close')
dt = data.current_dt
@@ -103,18 +104,20 @@ class TradingPairFixedSlippage(SlippageModel):
order.check_triggers(price, dt)
if not order.triggered:
log.debug('order has not reached the trigger at current '
'price {}'.format(price))
log.info(
'order has not reached the trigger at current '
'price {}'.format(price)
)
continue
execution_price, execution_volume = self.process_order(data, order)
if execution_price is not None:
transaction = create_transaction(
order, dt, execution_price, execution_volume
)
transaction = create_transaction(
order, dt, execution_price, execution_volume
)
self._volume_for_bar += abs(transaction.amount)
yield order, transaction
self._volume_for_bar += abs(transaction.amount)
yield order, transaction
def process_order(self, data, order):
price = data.current(order.asset, 'close')
@@ -205,34 +208,25 @@ class ExchangeBlotter(Blotter):
for order in self.open_orders[asset]:
log.debug('found open order: {}'.format(order.id))
new_order, executed_price = exchange.get_order(order.id, asset)
log.debug(
'got updated order {} {}'.format(
new_order, executed_price
transactions = exchange.process_order(order)
# TODO: not letting partial orders through because of calculation issues
if transactions and order.status == ORDER_STATUS.FILLED:
avg_price = np.average(
a=[t.price for t in transactions],
weights=[t.amount for t in transactions],
)
)
order.status = new_order.status
if order.status == ORDER_STATUS.FILLED:
order.commission = new_order.commission
if order.amount != new_order.amount:
log.warn(
'executed order amount {} differs '
'from original'.format(
new_order.amount, order.amount
)
ostatus = 'filled' if order.open_amount == 0 else 'partial'
log.info(
'{} order {} / {}: {}, avg price: {}'.format(
ostatus,
order.id,
asset.symbol,
order.filled,
avg_price,
)
order.amount = new_order.amount
transaction = Transaction(
asset=order.asset,
amount=order.amount,
dt=pd.Timestamp.utcnow(),
price=executed_price,
order_id=order.id,
commission=order.commission
)
yield order, transaction
for transaction in transactions:
yield order, transaction
elif order.status == ORDER_STATUS.CANCELLED:
yield order, None
@@ -291,6 +291,7 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase):
DataFrame
"""
# TODO: verify that the exchange supports the timeframe
bundle = self.exchange_bundles[exchange_name] # type: ExchangeBundle
freq, candle_size, unit, adj_data_frequency = get_frequency(
+7
View File
@@ -100,6 +100,13 @@ class InvalidHistoryFrequencyError(ZiplineError):
).strip()
class UnsupportedHistoryFrequencyError(ZiplineError):
msg = (
'{exchange} does not support candle frequency {freq}, please choose '
'from: {freqs}.'
).strip()
class InvalidHistoryTimeframeError(ZiplineError):
msg = (
'CCXT timeframe {timeframe} not supported by the exchange.'
+3 -2
View File
@@ -192,7 +192,7 @@ def get_symbols_string(assets):
return ', '.join([asset.symbol for asset in array])
def get_exchange_auth(exchange_name, environ=None):
def get_exchange_auth(exchange_name, alias=None, environ=None):
"""
The de-serialized contend of the exchange's auth.json file.
@@ -207,7 +207,8 @@ def get_exchange_auth(exchange_name, environ=None):
"""
exchange_folder = get_exchange_folder(exchange_name, environ)
filename = os.path.join(exchange_folder, 'auth.json')
name = 'auth' if alias is None else alias
filename = os.path.join(exchange_folder, '{}.json'.format(name))
if os.path.isfile(filename):
with open(filename) as data_file:
+2 -2
View File
@@ -13,12 +13,12 @@ exchange_cache = dict()
def get_exchange(exchange_name, base_currency=None, must_authenticate=False,
skip_init=False):
skip_init=False, auth_alias=None):
key = (exchange_name, base_currency)
if key in exchange_cache:
return exchange_cache[key]
exchange_auth = get_exchange_auth(exchange_name)
exchange_auth = get_exchange_auth(exchange_name, alias=auth_alias)
has_auth = (exchange_auth['key'] != '' and exchange_auth['secret'] != '')
if must_authenticate and not has_auth:
+7 -3
View File
@@ -359,9 +359,13 @@ def stats_to_s3(uri, stats, algo_namespace, recorded_cols=None,
pid = os.getpid()
parts = uri.split('//')
obj = s3.Object(parts[1], '{}/{}-{}-{}.csv'.format(
folder, timestr, algo_namespace, pid
))
path = '{folder}/{algo}/{time}-{algo}-{pid}.csv'.format(
folder=folder,
algo=algo_namespace,
time=timestr,
pid=pid,
)
obj = s3.Object(parts[1], path)
obj.put(Body=bytes_to_write)
+12 -4
View File
@@ -91,6 +91,7 @@ def _run(handle_data,
live_graph,
analyze_live,
simulate_orders,
auth_aliases,
stats_output):
"""Run a backtest for the given algorithm.
@@ -163,14 +164,19 @@ def _run(handle_data,
raise ValueError('Please specify at least one exchange.')
exchange_list = [x.strip().lower() for x in exchange.split(',')]
exchanges = dict()
for exchange_name in exchange_list:
exchanges[exchange_name] = get_exchange(
exchange_name=exchange_name,
for name in exchange_list:
if auth_aliases is not None and name in auth_aliases:
auth_alias = auth_aliases[name]
else:
auth_alias = None
exchanges[name] = get_exchange(
exchange_name=name,
base_currency=base_currency,
must_authenticate=(live and not simulate_orders),
skip_init=True,
auth_alias=auth_alias,
)
open_calendar = get_calendar('OPEN')
@@ -391,6 +397,7 @@ def run_algorithm(initialize,
live_graph=False,
analyze_live=None,
simulate_orders=True,
auth_aliases=None,
stats_output=None,
output=os.devnull):
"""Run a trading algorithm.
@@ -524,5 +531,6 @@ def run_algorithm(initialize,
live_graph=live_graph,
analyze_live=analyze_live,
simulate_orders=simulate_orders,
auth_aliases=auth_aliases,
stats_output=stats_output
)
+11
View File
@@ -2,6 +2,17 @@
Release Notes
=============
Version 0.4.5
^^^^^^^^^^^^^
**Release Date**: 2018-01-12
Bug Fixes
~~~~~~~~~
- Improved order execution for exchanges supporting trade lists (:issue:`151`)
- Fixed an issue where requesting history of multiple assets repeats values
- Raising an error for order amounts smaller than exchange lots
- Handling multiple req errors with tickers more gracefully (:issue:`160`)
Version 0.4.4
^^^^^^^^^^^^^
**Release Date**: 2018-01-09
+17 -1
View File
@@ -1,7 +1,7 @@
import pandas as pd
from logbook import Logger
from base import BaseExchangeTestCase
from .base import BaseExchangeTestCase
from catalyst.exchange.ccxt.ccxt_exchange import CCXT
from catalyst.exchange.exchange_execution import ExchangeLimitOrder
from catalyst.exchange.utils.exchange_utils import get_exchange_auth
@@ -76,6 +76,22 @@ class TestCCXT(BaseExchangeTestCase):
assert len(tickers) == 1
pass
def test_my_trades(self):
asset = self.exchange.get_asset('eng_eth')
trades = self.exchange.get_trades(asset)
assert trades
pass
def test_get_executed_order(self):
log.info('retrieving executed order')
asset = self.exchange.get_asset('eng_eth')
order = self.exchange.get_order('165784', asset)
transactions = self.exchange.process_order(order)
assert transactions
pass
def test_get_balances(self):
log.info('testing wallet balances')
# balances = self.exchange.get_balances()
@@ -184,13 +184,13 @@ class TestSuiteExchange(WithLogger, ZiplineTestCase):
)
sleep(1)
open_order, _ = exchange.get_order(order.id, asset)
open_order = exchange.get_order(order.id, asset)
self.assertEqual(0, open_order.status)
exchange.cancel_order(open_order, asset)
sleep(1)
canceled_order, _ = exchange.get_order(open_order.id, asset)
canceled_order = exchange.get_order(open_order.id, asset)
warnings = [record for record in log_catcher.records if
record.level == WARNING]