From 2a239fd5bbf71fb189f1148c85591ae4f19b9126 Mon Sep 17 00:00:00 2001 From: Frederic Fortier Date: Sat, 27 Jan 2018 20:53:38 -0500 Subject: [PATCH] BLD: made some adjustment to generate and use the exchange config more efficiently and without mapping. Currently testing. --- catalyst/exchange/ccxt/ccxt_exchange.py | 217 +----------- catalyst/exchange/utils/ccxt_utils.py | 311 ++++++++++++++++++ catalyst/exchange/utils/factory.py | 32 +- .../exchange/utils/serialization_utils.py | 4 + .../test_suites/test_suite_exchange.py | 36 +- 5 files changed, 336 insertions(+), 264 deletions(-) create mode 100644 catalyst/exchange/utils/ccxt_utils.py diff --git a/catalyst/exchange/ccxt/ccxt_exchange.py b/catalyst/exchange/ccxt/ccxt_exchange.py index 9a5dad60..c804b9cb 100644 --- a/catalyst/exchange/ccxt/ccxt_exchange.py +++ b/catalyst/exchange/ccxt/ccxt_exchange.py @@ -4,11 +4,6 @@ from collections import defaultdict import ccxt import pandas as pd import six -from ccxt import InvalidOrder, NetworkError, \ - ExchangeError -from logbook import Logger -from six import string_types - from catalyst.algorithm import MarketOrder from catalyst.assets._assets import TradingPair from catalyst.constants import LOG_LEVEL @@ -16,26 +11,18 @@ from catalyst.exchange.exchange import Exchange from catalyst.exchange.exchange_bundle import ExchangeBundle from catalyst.exchange.exchange_errors import InvalidHistoryFrequencyError, \ ExchangeSymbolsNotFound, ExchangeRequestError, InvalidOrderStyle, \ - ExchangeNotFoundError, CreateOrderError, InvalidHistoryTimeframeError, \ - UnsupportedHistoryFrequencyError - ExchangeNotFoundError, CreateOrderError, InvalidHistoryTimeframeError, \ - MarketsNotFoundError, InvalidMarketError + UnsupportedHistoryFrequencyError, \ + ExchangeNotFoundError, CreateOrderError, InvalidHistoryTimeframeError from catalyst.exchange.exchange_execution import ExchangeLimitOrder -from catalyst.exchange.utils.exchange_utils import mixin_market_params, \ - get_exchange_folder, get_catalyst_symbol, \ - get_exchange_auth +from catalyst.exchange.utils.ccxt_utils import get_exchange_config from catalyst.exchange.utils.datetime_utils import from_ms_timestamp, \ get_epoch, \ get_periods_range -from catalyst.exchange.utils.exchange_utils import from_ms_timestamp, \ - get_epoch, get_catalyst_symbol, \ - get_exchange_auth, get_exchange_config from catalyst.finance.order import Order, ORDER_STATUS from catalyst.finance.transaction import Transaction from ccxt import InvalidOrder, NetworkError, \ ExchangeError from logbook import Logger -from redo import retry from six import string_types log = Logger('CCXT', level=LOG_LEVEL) @@ -51,7 +38,7 @@ SUPPORTED_EXCHANGES = dict( class CCXT(Exchange): - def __init__(self, exchange_name, key, secret, base_currency): + def __init__(self, exchange_name, key, secret, base_currency, config=None): log.debug( 'finding {} in CCXT exchanges:\n{}'.format( exchange_name, ccxt.exchanges @@ -92,7 +79,7 @@ class CCXT(Exchange): self.bundle = ExchangeBundle(self.name) self._is_init = False - self._config = None + self._config = config def init(self): if self._is_init: @@ -118,136 +105,6 @@ class CCXT(Exchange): asset = TradingPair(**asset_dict) self.assets.append(asset) - def _fetch_markets(self): - markets_symbols = self.api.load_markets() - log.debug( - 'fetching {} markets:\n{}'.format( - self.name, markets_symbols - ) - ) - try: - markets = self.api.fetch_markets() - - except NetworkError as e: - raise ExchangeRequestError(error=e) - - if not markets: - raise MarketsNotFoundError( - exchange=self.name, - ) - - for market in markets: - if 'id' not in market: - raise InvalidMarketError( - exchange=self.name, - market=market, - ) - return markets - - def create_exchange_config(self): - config = dict( - name=self.name, - features=[feature for feature in self.has if self.has[feature]] - ) - markets = retry( - action=self._fetch_markets, - attempts=5, - sleeptime=5, - retry_exceptions=(ExchangeRequestError,), - cleanup=lambda: log.warn( - 'fetching markets again for {}'.format(self.name) - ), - ) - - config['assets'] = [] - for market in markets: - asset = self.create_trading_pair(market=market) - config['assets'].append(asset) - - return config - - def create_trading_pair(self, market, start_dt=None, end_dt=None, - leverage=1, end_daily=None, end_minute=None): - """ - Creating a TradingPair from market and asset data. - - Parameters - ---------- - market: dict[str, Object] - start_dt - end_dt - leverage - end_daily - end_minute - - Returns - ------- - - """ - params = dict( - exchange=self.name, - data_source='catalyst', - exchange_symbol=market['id'], - symbol=get_catalyst_symbol(market), - start_date=start_dt, - end_date=end_dt, - leverage=leverage, - asset_name=market['symbol'], - end_daily=end_daily, - end_minute=end_minute, - ) - self.apply_conditional_market_params(params, market) - - return TradingPair(**params) - - @staticmethod - def find_exchanges(features=None, is_authenticated=False): - ccxt_features = [] - if features is not None: - for feature in features: - if not feature.endswith('Bundle'): - ccxt_features.append(feature) - - exchange_names = [] - for exchange_name in ccxt.exchanges: - if is_authenticated: - exchange_auth = get_exchange_auth(exchange_name) - - has_auth = (exchange_auth['key'] != '' - and exchange_auth['secret'] != '') - - if not has_auth: - continue - - log.debug('loading exchange: {}'.format(exchange_name)) - exchange = getattr(ccxt, exchange_name)() - - if ccxt_features is None: - has_feature = True - - else: - try: - has_feature = all( - [exchange.has[feature] for feature in ccxt_features] - ) - - except Exception: - has_feature = False - - if has_feature: - try: - log.info('initializing {}'.format(exchange_name)) - exchange_names.append(exchange_name) - - except Exception as e: - log.warn( - 'unable to initialize exchange {}: {}'.format( - exchange_name, e - ) - ) - - return exchange_names - def account(self): return None @@ -295,13 +152,7 @@ class CCXT(Exchange): if source == 'ccxt': if isinstance(asset_or_symbol, string_types): parts = asset_or_symbol.split('/') - base_currency = self.substitute_currency_code( - parts[0], source - ) - quote_currency = self.substitute_currency_code( - parts[1], source - ) - return '{}_{}'.format(base_currency, quote_currency) + return '{}_{}'.format(parts[0].lower(), parts[1].lower()) else: return asset_or_symbol.symbol @@ -312,13 +163,7 @@ class CCXT(Exchange): ) else asset_or_symbol.symbol parts = symbol.split('_') - base_currency = self.substitute_currency_code( - parts[0], source - ) - quote_currency = self.substitute_currency_code( - parts[1], source - ) - return '{}/{}'.format(base_currency, quote_currency) + return '{}/{}'.format(parts[0].upper(), parts[1].upper()) @staticmethod def map_frequency(value, source='ccxt', raise_error=True): @@ -525,54 +370,6 @@ class CCXT(Exchange): except ExchangeSymbolsNotFound: return None - def apply_conditional_market_params(self, params, market): - """ - Applies a CCXT market dict to parameters of TradingPair init. - - Parameters - ---------- - params: dict[Object] - market: dict[Object] - - Returns - ------- - - """ - # TODO: make this more externalized / configurable - # Consider representing in some type of JSON structure - if 'active' in market: - params['trading_state'] = 1 if market['active'] else 0 - - else: - params['trading_state'] = 1 - - if 'lot' in market: - params['min_trade_size'] = market['lot'] - params['lot'] = market['lot'] - - if self.name == 'bitfinex': - params['maker'] = 0.001 - params['taker'] = 0.002 - - elif 'maker' in market and 'taker' in market \ - and market['maker'] is not None \ - and market['taker'] is not None: - params['maker'] = market['maker'] - params['taker'] = market['taker'] - - else: - # TODO: default commission, make configurable - params['maker'] = 0.0015 - params['taker'] = 0.0025 - - info = market['info'] if 'info' in market else None - if info: - if 'minimum_order_size' in info: - params['min_trade_size'] = float(info['minimum_order_size']) - - if 'lot' not in params: - params['lot'] = params['min_trade_size'] - def get_balances(self): try: log.debug('retrieving wallets balances') diff --git a/catalyst/exchange/utils/ccxt_utils.py b/catalyst/exchange/utils/ccxt_utils.py new file mode 100644 index 00000000..5c0484fb --- /dev/null +++ b/catalyst/exchange/utils/ccxt_utils.py @@ -0,0 +1,311 @@ +import json +import os +import pandas as pd +from six.moves.urllib import request + +from catalyst.assets._assets import TradingPair +from ccxt import NetworkError +from catalyst.constants import LOG_LEVEL, EXCHANGE_CONFIG_URL +from catalyst.exchange.exchange_errors import MarketsNotFoundError, \ + InvalidMarketError +from catalyst.exchange.utils.exchange_utils import get_catalyst_symbol, \ + get_exchange_folder, get_exchange_auth +from catalyst.exchange.utils.serialization_utils import ExchangeJSONDecoder, \ + ExchangeJSONEncoder +from logbook import Logger +from redo import retry +from ccxt.base.exchange import Exchange +from catalyst.utils.paths import last_modified_time, data_root, \ + ensure_directory +import ccxt + +log = Logger('ccxt_utils', level=LOG_LEVEL) + + +def find_exchange_configs(features=None, history=None, is_authenticated=False, + path=None): + """ + Finding exchanges from their config files + + Parameters + ---------- + features + is_authenticated + + Returns + ------- + + """ + exchange_config = [] + for exchange_name in ccxt.exchanges: + config = get_exchange_config(exchange_name, path) + if not config or 'error' in config: + log.info( + 'skipping invalid exchange {}'.format(exchange_name) + ) + + # Check if the exchange has an auth.json file + if is_authenticated: + exchange_auth = get_exchange_auth(exchange_name) + has_auth = (exchange_auth['key'] != '' + and exchange_auth['secret'] != '') + + if not has_auth: + continue + + if features is None: + has_features = True + + else: + try: + supported_features = [ + feature for feature in features if + feature in config['features'] + ] + has_features = len(supported_features) > 0 + except Exception: + has_features = False + + # TODO: filter by history + if has_features: + try: + exchange_config.append(config) + + except Exception as e: + log.warn( + 'unable to initialize exchange {}: {}'.format( + exchange_name, e + ) + ) + + return exchange_config + + +def get_exchange_config(exchange_name, path=None, environ=None, + expiry='2H'): + """ + The de-serialized content of the exchange's config.json. + Parameters + ---------- + exchange_name: str + The exchange name + filename: str + The target file + environ: + + Returns + ------- + config: dict[srt, Object] + The config dictionary. + + """ + try: + if path is None: + root = data_root(environ) + path = os.path.join(root, 'exchanges') + + folder = os.path.join(path, exchange_name) + ensure_directory(folder) + + filename = os.path.join(folder, 'config.json') + url = EXCHANGE_CONFIG_URL.format(exchange=exchange_name) + if os.path.isfile(filename): + # If the file exists, only update periodically to avoid + # unnecessary calls + now = pd.Timestamp.utcnow() + limit = pd.Timedelta(expiry) + if pd.Timedelta(now - last_modified_time(filename)) > limit: + request.urlretrieve(url=url, filename=filename) + + else: + request.urlretrieve(url=url, filename=filename) + + with open(filename) as data_file: + data = json.load(data_file, cls=ExchangeJSONDecoder) + return data + + except Exception as e: + log.warn( + 'unable to download {} config: {}'.format( + exchange_name, e + ) + ) + return dict(error=e) + + +def save_exchange_config(config, filename=None, environ=None): + """ + Save assets into an exchange_config file. + Parameters + ---------- + exchange_name: str + config + environ + Returns + ------- + """ + if filename is None: + name = 'config.json' + exchange_folder = get_exchange_folder(config['id'], environ) + filename = os.path.join(exchange_folder, name) + + with open(filename, 'w+') as handle: + json.dump(config, handle, indent=4, cls=ExchangeJSONEncoder) + + +def fetch_markets(ccxt_exchange): + """ + Fetches CCXT market objects. + + Parameters + ---------- + ccxt_exchange: Exchange + + Returns + ------- + + """ + markets_symbols = ccxt_exchange.load_markets() + log.debug( + 'fetching {} markets:\n{}'.format( + ccxt_exchange.name, markets_symbols + ) + ) + markets = ccxt_exchange.fetch_markets() + + if not markets: + raise MarketsNotFoundError( + exchange=ccxt_exchange.name, + ) + + for market in markets: + if 'id' not in market: + raise InvalidMarketError( + exchange=ccxt_exchange.name, + market=market, + ) + return markets + + +def create_exchange_config(ccxt_exchange): + """ + Creates an exchange config structure. + + Parameters + ---------- + ccxt_exchange: Exchange + + Returns + ------- + + """ + exchange_name = ccxt_exchange.__class__.__name__ + config = dict( + id=exchange_name, + name=ccxt_exchange.name, + features=[ + feature for feature in ccxt_exchange.has if + ccxt_exchange.has[feature] + ] + ) + markets = retry( + action=fetch_markets, + attempts=5, + sleeptime=5, + retry_exceptions=(NetworkError,), + cleanup=lambda: log.warn( + 'fetching markets again for {}'.format(exchange_name) + ), + args=(ccxt_exchange,) + ) + + config['assets'] = [] + for market in markets: + asset = create_trading_pair(exchange_name, market) + config['assets'].append(asset) + + return config + + +def create_trading_pair(exchange_name, market, start_dt=None, end_dt=None, + leverage=1, end_daily=None, end_minute=None): + """ + Creating a TradingPair from market and asset data. + + Parameters + ---------- + market: dict[str, Object] + start_dt + end_dt + leverage + end_daily + end_minute + + Returns + ------- + + """ + params = dict( + exchange=exchange_name, + data_source='catalyst', + exchange_symbol=market['id'], + symbol=get_catalyst_symbol(market), + start_date=start_dt, + end_date=end_dt, + leverage=leverage, + asset_name=market['symbol'], + end_daily=end_daily, + end_minute=end_minute, + ) + apply_conditional_market_params(exchange_name, params, market) + + return TradingPair(**params) + + +def apply_conditional_market_params(exchange_name, params, market): + """ + Applies a CCXT market dict to parameters of TradingPair init. + + Parameters + ---------- + params: dict[Object] + market: dict[Object] + + Returns + ------- + + """ + # TODO: make this more externalized / configurable + # Consider representing in some type of JSON structure + if 'active' in market: + params['trading_state'] = 1 if market['active'] else 0 + + else: + params['trading_state'] = 1 + + if 'lot' in market: + params['min_trade_size'] = market['lot'] + params['lot'] = market['lot'] + + if exchange_name == 'bitfinex': + params['maker'] = 0.001 + params['taker'] = 0.002 + + elif 'maker' in market and 'taker' in market \ + and market['maker'] is not None \ + and market['taker'] is not None: + params['maker'] = market['maker'] + params['taker'] = market['taker'] + + else: + # TODO: default commission, make configurable + params['maker'] = 0.0015 + params['taker'] = 0.0025 + + info = market['info'] if 'info' in market else None + if info: + if 'minimum_order_size' in info: + params['min_trade_size'] = float(info['minimum_order_size']) + + if 'lot' not in params: + params['lot'] = params['min_trade_size'] diff --git a/catalyst/exchange/utils/factory.py b/catalyst/exchange/utils/factory.py index 77b2d708..d4e5c450 100644 --- a/catalyst/exchange/utils/factory.py +++ b/catalyst/exchange/utils/factory.py @@ -4,8 +4,9 @@ from catalyst.constants import LOG_LEVEL from catalyst.exchange.ccxt.ccxt_exchange import CCXT from catalyst.exchange.exchange import Exchange from catalyst.exchange.exchange_errors import ExchangeAuthEmpty +from catalyst.exchange.utils.ccxt_utils import find_exchange_configs from catalyst.exchange.utils.exchange_utils import get_exchange_auth, \ - get_exchange_folder, is_blacklist + get_exchange_folder from logbook import Logger log = Logger('factory', level=LOG_LEVEL) @@ -13,7 +14,7 @@ exchange_cache = dict() def get_exchange(exchange_name, base_currency=None, must_authenticate=False, - skip_init=False, auth_alias=None): + skip_init=False, auth_alias=None, config=None): key = (exchange_name, base_currency) if key in exchange_cache: return exchange_cache[key] @@ -34,6 +35,7 @@ def get_exchange(exchange_name, base_currency=None, must_authenticate=False, key=exchange_auth['key'], secret=exchange_auth['secret'], base_currency=base_currency, + config=config, ) exchange_cache[key] = exchange @@ -51,8 +53,8 @@ def get_exchanges(exchange_names): return exchanges -def find_exchanges(features=None, skip_blacklist=True, is_authenticated=False, - base_currency=None): +def find_exchanges(features=None, history=None, skip_blacklist=True, path=None, + is_authenticated=False, base_currency=None): """ Find exchanges filtered by a list of feature. @@ -70,28 +72,20 @@ def find_exchanges(features=None, skip_blacklist=True, is_authenticated=False, list[Exchange] """ - exchange_names = CCXT.find_exchanges(features, is_authenticated) - + exchange_configs = find_exchange_configs( + features, history, is_authenticated, path + ) exchanges = [] - for exchange_name in exchange_names: - if skip_blacklist and is_blacklist(exchange_name): + for config in exchange_configs: + if skip_blacklist and (config is None or 'error' in config): continue exchange = get_exchange( - exchange_name=exchange_name, + exchange_name=config['id'], skip_init=True, base_currency=base_currency, + config=config, ) - - if features is not None: - if 'dailyBundle' in features \ - and not exchange.has_bundle('daily'): - continue - - elif 'minuteBundle' in features \ - and not exchange.has_bundle('minute'): - continue - exchanges.append(exchange) return exchanges diff --git a/catalyst/exchange/utils/serialization_utils.py b/catalyst/exchange/utils/serialization_utils.py index 62ab74d5..6e1d21e3 100644 --- a/catalyst/exchange/utils/serialization_utils.py +++ b/catalyst/exchange/utils/serialization_utils.py @@ -3,6 +3,7 @@ import re from json import JSONEncoder import pandas as pd +from catalyst.assets._assets import TradingPair from catalyst.constants import DATE_TIME_FORMAT from six import string_types @@ -12,6 +13,9 @@ class ExchangeJSONEncoder(json.JSONEncoder): if isinstance(obj, pd.Timestamp): return obj.strftime(DATE_TIME_FORMAT) + elif isinstance(obj, TradingPair): + return obj.to_dict() + # Let the base class default method raise the TypeError return JSONEncoder.default(self, obj) diff --git a/tests/exchange/test_suites/test_suite_exchange.py b/tests/exchange/test_suites/test_suite_exchange.py index 4088a675..ca4357a5 100644 --- a/tests/exchange/test_suites/test_suite_exchange.py +++ b/tests/exchange/test_suites/test_suite_exchange.py @@ -21,47 +21,13 @@ log = Logger('TestSuiteExchange') class TestSuiteExchange(WithLogger, ZiplineTestCase): - def _test_markets_exchange(self, exchange, attempts=0): - assets = None - try: - exchange.init() - - # Verify that the assets and markets are populated - if not exchange.markets: - raise ValueError( - 'no markets found' - ) - if not exchange.assets: - raise ValueError( - 'no assets derived from markets' - ) - assets = exchange.assets - - except ExchangeRequestError as e: - sleep(5) - - if attempts > 5: - handle_exchange_error(exchange, e) - - else: - print( - 're-trying an exchange request {} {}'.format( - exchange.name, attempts - ) - ) - self._test_markets_exchange(exchange, attempts + 1) - - except Exception as e: - handle_exchange_error(exchange, e) - - return assets - def test_markets(self): population = 3 results = dict() exchanges = select_random_exchanges(population) # Type: list[Exchange] for exchange in exchanges: + exchange.init() assets = self._test_markets_exchange(exchange) if assets is not None: