diff --git a/.gitignore b/.gitignore index ff13509b..c40a235c 100644 --- a/.gitignore +++ b/.gitignore @@ -40,6 +40,7 @@ develop-eggs coverage.xml htmlcov nosetests.xml +.python-version # C Extensions *.o diff --git a/catalyst/examples/mean_reversion_simple.py b/catalyst/examples/mean_reversion_simple.py index bf59bf35..084411d9 100644 --- a/catalyst/examples/mean_reversion_simple.py +++ b/catalyst/examples/mean_reversion_simple.py @@ -248,7 +248,7 @@ if __name__ == '__main__': if live: run_algorithm( - capital_base=0.1, + capital_base=0.01, initialize=initialize, handle_data=handle_data, analyze=analyze, @@ -257,8 +257,9 @@ if __name__ == '__main__': algo_namespace=NAMESPACE, base_currency='btc', live_graph=False, - simulate_orders=True, + simulate_orders=False, stats_output=None, + # auth_aliases=dict(poloniex='auth2') ) else: diff --git a/catalyst/exchange/ccxt/ccxt_exchange.py b/catalyst/exchange/ccxt/ccxt_exchange.py index bade7221..164d5476 100644 --- a/catalyst/exchange/ccxt/ccxt_exchange.py +++ b/catalyst/exchange/ccxt/ccxt_exchange.py @@ -6,6 +6,11 @@ from collections import defaultdict import ccxt import pandas as pd import six +from ccxt import InvalidOrder, NetworkError, \ + ExchangeError +from logbook import Logger +from six import string_types + from catalyst.algorithm import MarketOrder from catalyst.assets._assets import TradingPair from catalyst.constants import LOG_LEVEL @@ -13,16 +18,14 @@ from catalyst.exchange.exchange import Exchange from catalyst.exchange.exchange_bundle import ExchangeBundle from catalyst.exchange.exchange_errors import InvalidHistoryFrequencyError, \ ExchangeSymbolsNotFound, ExchangeRequestError, InvalidOrderStyle, \ - ExchangeNotFoundError, CreateOrderError, InvalidHistoryTimeframeError + ExchangeNotFoundError, CreateOrderError, InvalidHistoryTimeframeError, \ + UnsupportedHistoryFrequencyError from catalyst.exchange.exchange_execution import ExchangeLimitOrder from catalyst.exchange.utils.exchange_utils import mixin_market_params, \ from_ms_timestamp, get_epoch, get_exchange_folder, get_catalyst_symbol, \ get_exchange_auth from catalyst.finance.order import Order, ORDER_STATUS -from ccxt import InvalidOrder, NetworkError, \ - ExchangeError -from logbook import Logger -from six import string_types +from catalyst.finance.transaction import Transaction log = Logger('CCXT', level=LOG_LEVEL) @@ -376,6 +379,14 @@ class CCXT(Exchange): symbols = self.get_symbols(assets) timeframe = CCXT.get_timeframe(freq) + if timeframe not in self.api.timeframes: + freqs = [CCXT.get_frequency(t) for t in self.api.timeframes] + raise UnsupportedHistoryFrequencyError( + exchange=self.name, + freq=freq, + freqs=freqs, + ) + ms = None if start_dt is not None: delta = start_dt - get_epoch() @@ -705,6 +716,12 @@ class CCXT(Exchange): else: adj_amount = abs(amount) + if adj_amount == 0: + raise CreateOrderError( + exchange=self.name, + e='order amount lower than the smallest lot: {}'.format(amount) + ) + try: result = self.api.create_order( symbol=symbol, @@ -759,13 +776,111 @@ class CCXT(Exchange): orders = [] for order_status in result: - order, executed_price = self._create_order(order_status) + order, _ = self._create_order(order_status) if asset is None or asset == order.sid: orders.append(order) return orders - def get_order(self, order_id, asset_or_symbol=None): + def _process_order_fallback(self, order): + """ + Fallback method for exchanges which do not play nice with + fetch-my-trades. Apparently, about 60% of exchanges will return + the correct executed values with this method. Others will support + fetch-my-trades. + + Parameters + ---------- + order: Order + + Returns + ------- + float + + """ + exc_order, price = self.get_order( + order.id, order.asset, return_price=True + ) + order.status = exc_order.status + + order.commission = exc_order.commission + if order.amount != exc_order.amount: + log.warn( + 'executed order amount {} differs ' + 'from original'.format( + exc_order.amount, order.amount + ) + ) + order.amount = exc_order.amount + + if order.status == ORDER_STATUS.FILLED: + transaction = Transaction( + asset=order.asset, + amount=order.amount, + dt=pd.Timestamp.utcnow(), + price=price, + order_id=order.id, + commission=order.commission + ) + return [transaction] + + def process_order(self, order): + # TODO: move to parent class after tracking features in the parent + if not self.api.hasFetchMyTrades: + return self._process_order_fallback(order) + + try: + all_trades = self.get_trades(order.asset) + except ExchangeRequestError as e: + log.warn( + 'unable to fetch account trades, trying an alternate ' + 'method to find executed order {} / {}: {}'.format( + order.id, order.asset.symbol, e + ) + ) + return self._process_order_fallback(order) + + transactions = [] + trades = [t for t in all_trades if t['order'] == order.id] + if not trades: + log.debug( + 'order {} / {} not found in trades'.format( + order.id, order.asset.symbol + ) + ) + return transactions + + trades.sort(key=lambda t: t['timestamp'], reverse=False) + order.filled = 0 + order.commission = 0 + for trade in trades: + # status property will update automatically + filled = trade['amount'] * order.direction + order.filled += filled + + commission = 0 + if 'fee' in trade and 'cost' in trade['fee']: + commission = trade['fee']['cost'] + order.commission += commission + + order.check_triggers( + price=trade['price'], + dt=pd.to_datetime(trade['timestamp'], unit='ms', utc=True), + ) + transaction = Transaction( + asset=order.asset, + amount=filled, + dt=pd.Timestamp.utcnow(), + price=trade['price'], + order_id=order.id, + commission=commission + ) + transactions.append(transaction) + + order.broker_order_id = ', '.join([t['id'] for t in trades]) + return transactions + + def get_order(self, order_id, asset_or_symbol=None, return_price=False): if asset_or_symbol is None: log.debug( 'order not found in memory, the request might fail ' @@ -777,6 +892,12 @@ class CCXT(Exchange): order_status = self.api.fetch_order(id=order_id, symbol=symbol) order, executed_price = self._create_order(order_status) + if return_price: + return order, executed_price + + else: + return order + except (ExchangeError, NetworkError) as e: log.warn( 'unable to fetch order {} / {}: {}'.format( @@ -785,8 +906,6 @@ class CCXT(Exchange): ) raise ExchangeRequestError(error=e) - return order, executed_price - def cancel_order(self, order_param, asset_or_symbol=None): order_id = order_param.id \ if isinstance(order_param, Order) else order_param @@ -822,48 +941,45 @@ class CCXT(Exchange): list[dict[str, float] """ - tickers = dict() - try: - for asset in assets: - symbol = self.get_symbol(asset) - # TODO: use fetch_tickers() for efficiency - # I tried using fetch_tickers() but noticed some - # inconsistencies, see issue: - # https://github.com/ccxt/ccxt/issues/870 + tickers = {} + for asset in assets: + symbol = self.get_symbol(asset) + + self.ask_request() + + # TODO: use fetch_tickers() for efficiency + # I tried using fetch_tickers() but noticed some + # inconsistencies, see issue: + # https://github.com/ccxt/ccxt/issues/870 + try: ticker = self.api.fetch_ticker(symbol=symbol) - if not ticker: - log.warn('ticker not found for {} {}'.format( - self.name, symbol - )) - continue - - ticker['last_traded'] = from_ms_timestamp(ticker['timestamp']) - - if 'last_price' not in ticker: - # TODO: any more exceptions? - ticker['last_price'] = ticker['last'] - - if 'baseVolume' in ticker and ticker['baseVolume'] is not None: - # Using the volume represented in the base currency - ticker['volume'] = ticker['baseVolume'] - - elif 'info' in ticker and 'bidQty' in ticker['info'] \ - and 'askQty' in ticker['info']: - ticker['volume'] = float(ticker['info']['bidQty']) + \ - float(ticker['info']['askQty']) - - else: - ticker['volume'] = 0 - - tickers[asset] = ticker - - except (ExchangeError, NetworkError) as e: - log.warn( - 'unable to fetch ticker {} / {}: {}'.format( - self.name, asset.symbol, e + except (ExchangeError, NetworkError) as e: + log.warn( + 'unable to fetch ticker {} / {}: {}'.format( + self.name, asset.symbol, e + ) ) - ) - raise ExchangeRequestError(error=e) + continue + + ticker['last_traded'] = from_ms_timestamp(ticker['timestamp']) + + if 'last_price' not in ticker: + # TODO: any more exceptions? + ticker['last_price'] = ticker['last'] + + if 'baseVolume' in ticker and ticker['baseVolume'] is not None: + # Using the volume represented in the base currency + ticker['volume'] = ticker['baseVolume'] + + elif 'info' in ticker and 'bidQty' in ticker['info'] \ + and 'askQty' in ticker['info']: + ticker['volume'] = float(ticker['info']['bidQty']) + \ + float(ticker['info']['askQty']) + + else: + ticker['volume'] = 0 + + tickers[asset] = ticker return tickers @@ -893,3 +1009,27 @@ class CCXT(Exchange): )) return result + + def get_trades(self, asset, my_trades=True, start_dt=None, limit=None): + if not my_trades: + raise NotImplemented( + 'get_trades only supports "my trades"' + ) + + # TODO: is it possible to sort this? Limit is useless otherwise. + ccxt_symbol = self.get_symbol(asset) + try: + trades = self.api.fetch_my_trades( + symbol=ccxt_symbol, + since=start_dt, + limit=limit, + ) + except (ExchangeError, NetworkError) as e: + log.warn( + 'unable to fetch trades {} / {}: {}'.format( + self.name, asset.symbol, e + ) + ) + raise ExchangeRequestError(error=e) + + return trades diff --git a/catalyst/exchange/exchange.py b/catalyst/exchange/exchange.py index 75f4ec3c..a0f3a4bf 100644 --- a/catalyst/exchange/exchange.py +++ b/catalyst/exchange/exchange.py @@ -899,6 +899,22 @@ class Exchange: """ pass + @abstractmethod + def process_order(self, order): + """ + Similar to get_order but looks only for executed orders. + + Parameters + ---------- + order: Order + + Returns + ------- + float + Avg execution price + + """ + @abstractmethod def cancel_order(self, order_param, symbol_or_asset=None): """Cancel an open order. @@ -979,7 +995,7 @@ class Exchange: @abc.abstractmethod def get_orderbook(self, asset, order_type, limit): """ - Retrieve the the orderbook for the given trading pair. + Retrieve the orderbook for the given trading pair. Parameters ---------- @@ -993,3 +1009,20 @@ class Exchange: list[dict[str, float] """ pass + + @abc.abstractmethod + def get_trades(self, asset, my_trades, start_dt, limit): + """ + Retrieve a list of trades. + + Parameters + ---------- + my_trades: bool + List only my trades. + start_dt + limit + + Returns + ------- + + """ diff --git a/catalyst/exchange/exchange_blotter.py b/catalyst/exchange/exchange_blotter.py index d638e4bd..c8e6c531 100644 --- a/catalyst/exchange/exchange_blotter.py +++ b/catalyst/exchange/exchange_blotter.py @@ -1,4 +1,8 @@ +import numpy as np import pandas as pd +from logbook import Logger +from redo import retry + from catalyst.assets._assets import TradingPair from catalyst.constants import LOG_LEVEL from catalyst.exchange.exchange_errors import ExchangeRequestError @@ -8,8 +12,6 @@ from catalyst.finance.order import ORDER_STATUS from catalyst.finance.slippage import SlippageModel from catalyst.finance.transaction import create_transaction, Transaction from catalyst.utils.input_validation import expect_types -from logbook import Logger -from redo import retry log = Logger('exchange_blotter', level=LOG_LEVEL) @@ -93,7 +95,6 @@ class TradingPairFixedSlippage(SlippageModel): def simulate(self, data, asset, orders_for_asset): self._volume_for_bar = 0 - price = data.current(asset, 'close') dt = data.current_dt @@ -103,18 +104,20 @@ class TradingPairFixedSlippage(SlippageModel): order.check_triggers(price, dt) if not order.triggered: - log.debug('order has not reached the trigger at current ' - 'price {}'.format(price)) + log.info( + 'order has not reached the trigger at current ' + 'price {}'.format(price) + ) continue execution_price, execution_volume = self.process_order(data, order) + if execution_price is not None: + transaction = create_transaction( + order, dt, execution_price, execution_volume + ) - transaction = create_transaction( - order, dt, execution_price, execution_volume - ) - - self._volume_for_bar += abs(transaction.amount) - yield order, transaction + self._volume_for_bar += abs(transaction.amount) + yield order, transaction def process_order(self, data, order): price = data.current(order.asset, 'close') @@ -205,34 +208,25 @@ class ExchangeBlotter(Blotter): for order in self.open_orders[asset]: log.debug('found open order: {}'.format(order.id)) - new_order, executed_price = exchange.get_order(order.id, asset) - log.debug( - 'got updated order {} {}'.format( - new_order, executed_price + transactions = exchange.process_order(order) + # TODO: not letting partial orders through because of calculation issues + if transactions and order.status == ORDER_STATUS.FILLED: + avg_price = np.average( + a=[t.price for t in transactions], + weights=[t.amount for t in transactions], ) - ) - order.status = new_order.status - - if order.status == ORDER_STATUS.FILLED: - order.commission = new_order.commission - if order.amount != new_order.amount: - log.warn( - 'executed order amount {} differs ' - 'from original'.format( - new_order.amount, order.amount - ) + ostatus = 'filled' if order.open_amount == 0 else 'partial' + log.info( + '{} order {} / {}: {}, avg price: {}'.format( + ostatus, + order.id, + asset.symbol, + order.filled, + avg_price, ) - order.amount = new_order.amount - - transaction = Transaction( - asset=order.asset, - amount=order.amount, - dt=pd.Timestamp.utcnow(), - price=executed_price, - order_id=order.id, - commission=order.commission ) - yield order, transaction + for transaction in transactions: + yield order, transaction elif order.status == ORDER_STATUS.CANCELLED: yield order, None diff --git a/catalyst/exchange/exchange_data_portal.py b/catalyst/exchange/exchange_data_portal.py index 6f57b7e7..4f73079e 100644 --- a/catalyst/exchange/exchange_data_portal.py +++ b/catalyst/exchange/exchange_data_portal.py @@ -291,6 +291,7 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase): DataFrame """ + # TODO: verify that the exchange supports the timeframe bundle = self.exchange_bundles[exchange_name] # type: ExchangeBundle freq, candle_size, unit, adj_data_frequency = get_frequency( diff --git a/catalyst/exchange/exchange_errors.py b/catalyst/exchange/exchange_errors.py index 0e22868f..d5af87c4 100644 --- a/catalyst/exchange/exchange_errors.py +++ b/catalyst/exchange/exchange_errors.py @@ -100,6 +100,13 @@ class InvalidHistoryFrequencyError(ZiplineError): ).strip() +class UnsupportedHistoryFrequencyError(ZiplineError): + msg = ( + '{exchange} does not support candle frequency {freq}, please choose ' + 'from: {freqs}.' + ).strip() + + class InvalidHistoryTimeframeError(ZiplineError): msg = ( 'CCXT timeframe {timeframe} not supported by the exchange.' diff --git a/catalyst/exchange/utils/exchange_utils.py b/catalyst/exchange/utils/exchange_utils.py index 081da592..baf4c629 100644 --- a/catalyst/exchange/utils/exchange_utils.py +++ b/catalyst/exchange/utils/exchange_utils.py @@ -192,7 +192,7 @@ def get_symbols_string(assets): return ', '.join([asset.symbol for asset in array]) -def get_exchange_auth(exchange_name, environ=None): +def get_exchange_auth(exchange_name, alias=None, environ=None): """ The de-serialized contend of the exchange's auth.json file. @@ -207,7 +207,8 @@ def get_exchange_auth(exchange_name, environ=None): """ exchange_folder = get_exchange_folder(exchange_name, environ) - filename = os.path.join(exchange_folder, 'auth.json') + name = 'auth' if alias is None else alias + filename = os.path.join(exchange_folder, '{}.json'.format(name)) if os.path.isfile(filename): with open(filename) as data_file: diff --git a/catalyst/exchange/utils/factory.py b/catalyst/exchange/utils/factory.py index 5650c42b..77b2d708 100644 --- a/catalyst/exchange/utils/factory.py +++ b/catalyst/exchange/utils/factory.py @@ -13,12 +13,12 @@ exchange_cache = dict() def get_exchange(exchange_name, base_currency=None, must_authenticate=False, - skip_init=False): + skip_init=False, auth_alias=None): key = (exchange_name, base_currency) if key in exchange_cache: return exchange_cache[key] - exchange_auth = get_exchange_auth(exchange_name) + exchange_auth = get_exchange_auth(exchange_name, alias=auth_alias) has_auth = (exchange_auth['key'] != '' and exchange_auth['secret'] != '') if must_authenticate and not has_auth: diff --git a/catalyst/exchange/utils/stats_utils.py b/catalyst/exchange/utils/stats_utils.py index 8cc9b6ca..6ca58d13 100644 --- a/catalyst/exchange/utils/stats_utils.py +++ b/catalyst/exchange/utils/stats_utils.py @@ -359,9 +359,13 @@ def stats_to_s3(uri, stats, algo_namespace, recorded_cols=None, pid = os.getpid() parts = uri.split('//') - obj = s3.Object(parts[1], '{}/{}-{}-{}.csv'.format( - folder, timestr, algo_namespace, pid - )) + path = '{folder}/{algo}/{time}-{algo}-{pid}.csv'.format( + folder=folder, + algo=algo_namespace, + time=timestr, + pid=pid, + ) + obj = s3.Object(parts[1], path) obj.put(Body=bytes_to_write) diff --git a/catalyst/utils/run_algo.py b/catalyst/utils/run_algo.py index 3d83426c..d1945fc6 100644 --- a/catalyst/utils/run_algo.py +++ b/catalyst/utils/run_algo.py @@ -91,6 +91,7 @@ def _run(handle_data, live_graph, analyze_live, simulate_orders, + auth_aliases, stats_output): """Run a backtest for the given algorithm. @@ -163,14 +164,19 @@ def _run(handle_data, raise ValueError('Please specify at least one exchange.') exchange_list = [x.strip().lower() for x in exchange.split(',')] - exchanges = dict() - for exchange_name in exchange_list: - exchanges[exchange_name] = get_exchange( - exchange_name=exchange_name, + for name in exchange_list: + if auth_aliases is not None and name in auth_aliases: + auth_alias = auth_aliases[name] + else: + auth_alias = None + + exchanges[name] = get_exchange( + exchange_name=name, base_currency=base_currency, must_authenticate=(live and not simulate_orders), skip_init=True, + auth_alias=auth_alias, ) open_calendar = get_calendar('OPEN') @@ -391,6 +397,7 @@ def run_algorithm(initialize, live_graph=False, analyze_live=None, simulate_orders=True, + auth_aliases=None, stats_output=None, output=os.devnull): """Run a trading algorithm. @@ -524,5 +531,6 @@ def run_algorithm(initialize, live_graph=live_graph, analyze_live=analyze_live, simulate_orders=simulate_orders, + auth_aliases=auth_aliases, stats_output=stats_output ) diff --git a/docs/source/releases.rst b/docs/source/releases.rst index 5bcfb7d6..78d17ca8 100644 --- a/docs/source/releases.rst +++ b/docs/source/releases.rst @@ -2,6 +2,17 @@ Release Notes ============= +Version 0.4.5 +^^^^^^^^^^^^^ +**Release Date**: 2018-01-12 + +Bug Fixes +~~~~~~~~~ +- Improved order execution for exchanges supporting trade lists (:issue:`151`) +- Fixed an issue where requesting history of multiple assets repeats values +- Raising an error for order amounts smaller than exchange lots +- Handling multiple req errors with tickers more gracefully (:issue:`160`) + Version 0.4.4 ^^^^^^^^^^^^^ **Release Date**: 2018-01-09 diff --git a/tests/exchange/test_ccxt.py b/tests/exchange/test_ccxt.py index 7ef939b1..04112675 100644 --- a/tests/exchange/test_ccxt.py +++ b/tests/exchange/test_ccxt.py @@ -1,7 +1,7 @@ import pandas as pd from logbook import Logger -from base import BaseExchangeTestCase +from .base import BaseExchangeTestCase from catalyst.exchange.ccxt.ccxt_exchange import CCXT from catalyst.exchange.exchange_execution import ExchangeLimitOrder from catalyst.exchange.utils.exchange_utils import get_exchange_auth @@ -76,6 +76,22 @@ class TestCCXT(BaseExchangeTestCase): assert len(tickers) == 1 pass + def test_my_trades(self): + asset = self.exchange.get_asset('eng_eth') + + trades = self.exchange.get_trades(asset) + assert trades + pass + + def test_get_executed_order(self): + log.info('retrieving executed order') + asset = self.exchange.get_asset('eng_eth') + + order = self.exchange.get_order('165784', asset) + transactions = self.exchange.process_order(order) + assert transactions + pass + def test_get_balances(self): log.info('testing wallet balances') # balances = self.exchange.get_balances() diff --git a/tests/exchange/test_suites/test_suite_exchange.py b/tests/exchange/test_suites/test_suite_exchange.py index 3eb3b38b..29baf739 100644 --- a/tests/exchange/test_suites/test_suite_exchange.py +++ b/tests/exchange/test_suites/test_suite_exchange.py @@ -184,13 +184,13 @@ class TestSuiteExchange(WithLogger, ZiplineTestCase): ) sleep(1) - open_order, _ = exchange.get_order(order.id, asset) + open_order = exchange.get_order(order.id, asset) self.assertEqual(0, open_order.status) exchange.cancel_order(open_order, asset) sleep(1) - canceled_order, _ = exchange.get_order(open_order.id, asset) + canceled_order = exchange.get_order(open_order.id, asset) warnings = [record for record in log_catcher.records if record.level == WARNING]