diff --git a/catalyst/constants.py b/catalyst/constants.py index 35e1f727..6372b11f 100644 --- a/catalyst/constants.py +++ b/catalyst/constants.py @@ -7,7 +7,8 @@ import logbook For example, if you want to see the DEBUG messages, run: $ export CATALYST_LOG_LEVEL=10 ''' -LOG_LEVEL = int(os.environ.get('CATALYST_LOG_LEVEL', logbook.INFO)) +# LOG_LEVEL = int(os.environ.get('CATALYST_LOG_LEVEL', logbook.INFO)) +LOG_LEVEL = logbook.DEBUG SYMBOLS_URL = 'https://s3.amazonaws.com/enigmaco/catalyst-exchanges/' \ '{exchange}/symbols.json' diff --git a/catalyst/exchange/ccxt/ccxt_exchange.py b/catalyst/exchange/ccxt/ccxt_exchange.py index d2c735cc..4d5b3939 100644 --- a/catalyst/exchange/ccxt/ccxt_exchange.py +++ b/catalyst/exchange/ccxt/ccxt_exchange.py @@ -57,12 +57,14 @@ class CCXT(Exchange): except Exception: raise ExchangeNotFoundError(exchange_name=exchange_name) - markets = self.api.load_markets() - log.debug('the markets:\n{}'.format(markets)) + self._symbol_maps = [None, None] + + markets_symbols = self.api.load_markets() + log.debug('the markets:\n{}'.format(markets_symbols)) self.name = exchange_name - self.assets = dict() + self.markets = self.api.fetch_markets() self.load_assets() self.base_currency = base_currency @@ -81,6 +83,14 @@ class CCXT(Exchange): def time_skew(self): return None + def get_market(self, symbol): + s = self.get_symbol(symbol) + market = next( + (market for market in self.markets if market['symbol'] == s), + None, + ) + return market + def get_symbol(self, asset_or_symbol): symbol = asset_or_symbol if isinstance( asset_or_symbol, string_types @@ -158,74 +168,102 @@ class CCXT(Exchange): except ExchangeSymbolsNotFound: return None - def _fetch_asset(self, market_id, is_local=False): + def fetch_asset_defs(self, market): + asset_defs = [] + + for is_local in (False, True): + asset_def = self.fetch_asset_def(market, is_local) + asset_defs.append((asset_def, is_local)) + + return asset_defs + + def fetch_asset_def(self, market, is_local=False): + exchange_symbol = market['id'] + symbol_map = self._fetch_symbol_map(is_local) if symbol_map is not None: assets_lower = {k.lower(): v for k, v in symbol_map.items()} - key = market_id.lower() + key = exchange_symbol.lower() asset = assets_lower[key] if key in assets_lower else None if asset is not None: - return asset, is_local - - elif not is_local: - return self._fetch_asset(market_id, True) + return asset else: - return None, is_local - - elif not is_local: - return self._fetch_asset(market_id, True) + return None else: - return None, is_local + return None + + def create_trading_pair(self, market, asset_def, is_local): + """ + Creating a TradingPair from market and asset data. + + Parameters + ---------- + market: dict[str, Object] + asset_def: dict[str, Object] + is_local: bool + + Returns + ------- + + """ + data_source = 'local' if is_local else 'catalyst' + params = dict( + exchange=self.name, + data_source=data_source, + exchange_symbol=market['id'], + ) + mixin_market_params(self.name, params, market) + + if asset_def is not None: + params['symbol'] = asset_def['symbol'] + + params['start_date'] = asset_def['start_date'] \ + if 'start_date' in asset_def else None + + params['end_date'] = asset_def['end_date'] \ + if 'end_date' in asset_def else None + + params['leverage'] = asset_def['leverage'] \ + if 'leverage' in asset_def else 1.0 + + params['asset_name'] = asset_def['asset_name'] \ + if 'asset_name' in asset_def else None + + params['end_daily'] = asset_def['end_daily'] \ + if 'end_daily' in asset_def \ + and asset_def['end_daily'] != 'N/A' else None + + params['end_minute'] = asset_def['end_minute'] \ + if 'end_minute' in asset_def \ + and asset_def['end_minute'] != 'N/A' else None + + else: + params['symbol'] = self.get_catalyst_symbol(market) + params['leverage'] = 1.0 + + return TradingPair(**params) def load_assets(self): - markets = self.api.fetch_markets() + self.assets = [] - for market in markets: - asset, is_local = self._fetch_asset(market['id']) - data_source = 'local' if is_local else 'catalyst' + for market in self.markets: + log.debug('fetching asset for market: {}'.format(market['id'])) + asset_defs = self.fetch_asset_defs(market) - params = dict( - exchange=self.name, - data_source=data_source, - exchange_symbol=market['id'], - ) - mixin_market_params(self.name, params, market) - - if asset is not None: - params['symbol'] = asset['symbol'] - - params['start_date'] = pd.to_datetime( - asset['start_date'], utc=True - ) if 'start_date' in asset else None - - params['end_date'] = pd.to_datetime( - asset['end_date'], utc=True - ) if 'end_date' in asset else None - - params['leverage'] = asset['leverage'] \ - if 'leverage' in asset else 1.0 - - params['asset_name'] = asset['asset_name'] \ - if 'asset_name' in asset else None - - params['end_daily'] = pd.to_datetime( - asset['end_daily'], utc=True - ) if 'end_daily' in asset and asset['end_daily'] != 'N/A' \ - else None - - params['end_minute'] = pd.to_datetime( - asset['end_minute'], utc=True - ) if 'end_minute' in asset and asset['end_minute'] != 'N/A' \ - else None - - else: - params['symbol'] = self.get_catalyst_symbol(market) - - trading_pair = TradingPair(**params) - self.assets[market['id']] = trading_pair + for asset_def in asset_defs: + if asset_def[0] is not None or not asset_defs[1]: + try: + asset = self.create_trading_pair( + market=market, + asset_def=asset_def[0], + is_local=asset_def[1] + ) + self.assets.append(asset) + except TypeError as e: + pass def get_balances(self): try: @@ -293,7 +331,7 @@ class CCXT(Exchange): order = Order( dt=date, - asset=self.assets[symbol], + asset=self.get_asset(symbol, is_exchange_symbol=True), amount=amount, stop=stop_price, limit=limit_price, diff --git a/catalyst/exchange/exchange.py b/catalyst/exchange/exchange.py index 317175d0..f38bfa36 100644 --- a/catalyst/exchange/exchange.py +++ b/catalyst/exchange/exchange.py @@ -34,7 +34,8 @@ class Exchange: def __init__(self): self.name = None - self.assets = dict() + self.assets = [] + self._symbol_maps = [None, None] self._portfolio = None self.minute_writer = None self.minute_reader = None @@ -145,9 +146,9 @@ class Exchange: """ symbol = None - for key in self.assets: - if not symbol and self.assets[key].symbol == asset.symbol: - symbol = key + for a in self.assets: + if not symbol and a.symbol == asset.symbol: + symbol = a.symbol if not symbol: raise ValueError('Currency %s not supported by exchange %s' % @@ -187,33 +188,32 @@ class Exchange: list[TradingPair] """ - assets = [] if symbols is not None: + assets = [] for symbol in symbols: asset = self.get_asset(symbol, data_frequency) assets.append(asset) + return assets else: - for key in self.assets: - assets.append(self.assets[key]) + return self.assets - return assets - - def _find_asset(self, asset, symbol, data_frequency, is_local=False): - assets = self.assets - for key in assets: + def _find_asset(self, asset, symbol, data_frequency, is_exchange_symbol, + is_local=False): + for a in self.assets: has_data = (data_frequency == 'minute' - and assets[key].end_minute is not None) \ + and a.end_minute is not None) \ or (data_frequency == 'daily' - and assets[key].end_daily is not None) + and a.end_daily is not None) - if not asset and assets[key].symbol.lower() == symbol.lower() \ + symbol_attr = a.exchange_symbol if is_exchange_symbol else a.symbol + if not asset and symbol_attr.lower() == symbol.lower() \ and (not data_frequency or has_data): - asset = assets[key] + asset = a return asset - def get_asset(self, symbol, data_frequency=None): + def get_asset(self, symbol, data_frequency=None, is_exchange_symbol=False): """ The market for the specified symbol. @@ -229,16 +229,19 @@ class Exchange: asset = None log.debug('searching asset {} on the server'.format(symbol)) - asset = self._find_asset(asset, symbol, data_frequency, False) + asset = self._find_asset( + asset, symbol, data_frequency, is_exchange_symbol, False + ) log.debug('asset {} not found on the server, searching local ' 'assets'.format(symbol)) - asset = self._find_asset(asset, symbol, data_frequency, True) + asset = self._find_asset( + asset, symbol, data_frequency, is_exchange_symbol, True + ) if not asset: - all_values = list(self.assets.values()) supported_symbols = sorted([ - asset.symbol for asset in all_values + asset.symbol for asset in self.assets ]) raise SymbolNotFoundOnExchange( @@ -250,7 +253,14 @@ class Exchange: return asset def fetch_symbol_map(self, is_local=False): - return get_exchange_symbols(self.name, is_local) + index = 1 if is_local else 0 + if self._symbol_maps[index] is not None: + return self._symbol_maps[index] + + else: + symbol_map = get_exchange_symbols(self.name, is_local) + self._symbol_maps[index] = symbol_map + return symbol_map @abstractmethod def load_assets(self, is_local=False): diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index 2bfa6201..4f4e7fc6 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -31,7 +31,7 @@ from catalyst.exchange.exchange_errors import EmptyValuesInBundleError, \ PricingDataNotLoadedError, DataCorruptionError, ExchangeSymbolsNotFound, \ PricingDataValueError from catalyst.exchange.exchange_utils import get_exchange_folder, \ - get_exchange_symbols, save_exchange_symbols + get_exchange_symbols, save_exchange_symbols, mixin_market_params from catalyst.utils.cli import maybe_show_progress from catalyst.utils.paths import ensure_directory @@ -667,12 +667,11 @@ class ExchangeBundle: """ log.info('ingesting csv file: {}'.format(path)) - try: - symbols_def = get_exchange_symbols( - self.exchange_name, is_local=True - ) - except ExchangeSymbolsNotFound: - symbols_def = dict() + + if self.exchange is None: + # Avoid circular dependencies + from catalyst.exchange.factory import get_exchange + self.exchange = get_exchange(self.exchange_name) problems = [] df = pd.read_csv( @@ -705,24 +704,40 @@ class ExchangeBundle: end_dt = df.index.get_level_values(1).max() end_dt_key = 'end_{}'.format(data_frequency) - if symbol is symbols_def: - symbol_def = symbols_def[symbol] + market = self.exchange.get_market(symbol) + if market is None: + raise ValueError('symbol not available in the exchange.') - start_dt = symbol_def['start_date'] \ - if symbol_def['start_date'] < start_dt else start_dt + params = dict( + exchange=self.exchange.name, + data_source='local', + exchange_symbol=market['id'], + ) + mixin_market_params(self.exchange_name, params, market) - end_dt = symbol_def[end_dt_key] \ - if symbol_def[end_dt_key] > end_dt else end_dt + asset_def = self.exchange.fetch_asset_def(market, True) + if asset_def is not None: + params['symbol'] = asset_def['symbol'] - end_daily = end_dt \ - if data_frequency == 'daily' else symbol_def['end_daily'] + params['start_date'] = asset_def['start_date'] \ + if asset_def['start_date'] < start_dt else start_dt - end_minute = end_dt \ - if data_frequency == 'minute' else symbol_def['end_minute'] + params['end_date'] = asset_def[end_dt_key] \ + if asset_def[end_dt_key] > end_dt else end_dt + + params['end_daily'] = end_dt \ + if data_frequency == 'daily' else asset_def['end_daily'] + + params['end_minute'] = end_dt \ + if data_frequency == 'minute' else asset_def['end_minute'] else: - end_daily = end_dt if data_frequency == 'daily' else 'N/A' - end_minute = end_dt if data_frequency == 'minute' else 'N/A' + params['symbol'] = self.exchange.get_catalyst_symbol(market) + + params['end_daily'] = end_dt \ + if data_frequency == 'daily' else 'N/A' + params['end_minute'] = end_dt \ + if data_frequency == 'minute' else 'N/A' if min_start_dt is None or start_dt < min_start_dt: min_start_dt = start_dt @@ -730,19 +745,8 @@ class ExchangeBundle: if max_end_dt is None or end_dt > max_end_dt: max_end_dt = end_dt - asset = TradingPair( - symbol=symbol, - exchange=self.exchange_name, - start_date=start_dt, - end_date=end_dt, - leverage=0, # TODO: add as an optional column - asset_name=symbol, - min_trade_size=0, # TODO: add as an optional column - end_daily=end_daily, - end_minute=end_minute, - exchange_symbol=symbol - ) - assets[symbol] = asset + asset = TradingPair(**params) + assets[market['id']] = asset save_exchange_symbols(self.exchange_name, assets, True) diff --git a/catalyst/exchange/exchange_utils.py b/catalyst/exchange/exchange_utils.py index 0ca18c95..c2250187 100644 --- a/catalyst/exchange/exchange_utils.py +++ b/catalyst/exchange/exchange_utils.py @@ -8,6 +8,7 @@ from datetime import date, datetime import pandas as pd from catalyst.assets._assets import TradingPair +from six import string_types from six.moves.urllib import request from catalyst.constants import DATE_FORMAT, SYMBOLS_URL @@ -100,6 +101,20 @@ def download_exchange_symbols(exchange_name, environ=None): return response +def symbols_parser(asset_def): + for key, value in asset_def.items(): + match = isinstance(value, string_types) \ + and re.search(r'(\d{4}-\d{2}-\d{2})', value) + + if match: + try: + asset_def[key] = pd.to_datetime(value, utc=True) + except ValueError: + pass + + return asset_def + + def get_exchange_symbols(exchange_name, is_local=False, environ=None): """ The de-serialized content of the exchange's symbols.json. @@ -125,10 +140,10 @@ def get_exchange_symbols(exchange_name, is_local=False, environ=None): if os.path.isfile(filename): with open(filename) as data_file: try: - data = json.load(data_file) + data = json.load(data_file, object_hook=symbols_parser) return data - except ValueError: + except ValueError as e: return dict() else: raise ExchangeSymbolsNotFound(