mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-29 05:32:55 +08:00
BLD: tested ccxt with manual data ingestion
This commit is contained in:
@@ -7,7 +7,8 @@ import logbook
|
||||
For example, if you want to see the DEBUG messages, run:
|
||||
$ export CATALYST_LOG_LEVEL=10
|
||||
'''
|
||||
LOG_LEVEL = int(os.environ.get('CATALYST_LOG_LEVEL', logbook.INFO))
|
||||
# LOG_LEVEL = int(os.environ.get('CATALYST_LOG_LEVEL', logbook.INFO))
|
||||
LOG_LEVEL = logbook.DEBUG
|
||||
|
||||
SYMBOLS_URL = 'https://s3.amazonaws.com/enigmaco/catalyst-exchanges/' \
|
||||
'{exchange}/symbols.json'
|
||||
|
||||
@@ -57,12 +57,14 @@ class CCXT(Exchange):
|
||||
except Exception:
|
||||
raise ExchangeNotFoundError(exchange_name=exchange_name)
|
||||
|
||||
markets = self.api.load_markets()
|
||||
log.debug('the markets:\n{}'.format(markets))
|
||||
self._symbol_maps = [None, None]
|
||||
|
||||
markets_symbols = self.api.load_markets()
|
||||
log.debug('the markets:\n{}'.format(markets_symbols))
|
||||
|
||||
self.name = exchange_name
|
||||
|
||||
self.assets = dict()
|
||||
self.markets = self.api.fetch_markets()
|
||||
self.load_assets()
|
||||
|
||||
self.base_currency = base_currency
|
||||
@@ -81,6 +83,14 @@ class CCXT(Exchange):
|
||||
def time_skew(self):
|
||||
return None
|
||||
|
||||
def get_market(self, symbol):
|
||||
s = self.get_symbol(symbol)
|
||||
market = next(
|
||||
(market for market in self.markets if market['symbol'] == s),
|
||||
None,
|
||||
)
|
||||
return market
|
||||
|
||||
def get_symbol(self, asset_or_symbol):
|
||||
symbol = asset_or_symbol if isinstance(
|
||||
asset_or_symbol, string_types
|
||||
@@ -158,74 +168,102 @@ class CCXT(Exchange):
|
||||
except ExchangeSymbolsNotFound:
|
||||
return None
|
||||
|
||||
def _fetch_asset(self, market_id, is_local=False):
|
||||
def fetch_asset_defs(self, market):
|
||||
asset_defs = []
|
||||
|
||||
for is_local in (False, True):
|
||||
asset_def = self.fetch_asset_def(market, is_local)
|
||||
asset_defs.append((asset_def, is_local))
|
||||
|
||||
return asset_defs
|
||||
|
||||
def fetch_asset_def(self, market, is_local=False):
|
||||
exchange_symbol = market['id']
|
||||
|
||||
symbol_map = self._fetch_symbol_map(is_local)
|
||||
if symbol_map is not None:
|
||||
assets_lower = {k.lower(): v for k, v in symbol_map.items()}
|
||||
key = market_id.lower()
|
||||
key = exchange_symbol.lower()
|
||||
|
||||
asset = assets_lower[key] if key in assets_lower else None
|
||||
if asset is not None:
|
||||
return asset, is_local
|
||||
|
||||
elif not is_local:
|
||||
return self._fetch_asset(market_id, True)
|
||||
return asset
|
||||
|
||||
else:
|
||||
return None, is_local
|
||||
|
||||
elif not is_local:
|
||||
return self._fetch_asset(market_id, True)
|
||||
return None
|
||||
|
||||
else:
|
||||
return None, is_local
|
||||
return None
|
||||
|
||||
def create_trading_pair(self, market, asset_def, is_local):
|
||||
"""
|
||||
Creating a TradingPair from market and asset data.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
market: dict[str, Object]
|
||||
asset_def: dict[str, Object]
|
||||
is_local: bool
|
||||
|
||||
Returns
|
||||
-------
|
||||
|
||||
"""
|
||||
data_source = 'local' if is_local else 'catalyst'
|
||||
params = dict(
|
||||
exchange=self.name,
|
||||
data_source=data_source,
|
||||
exchange_symbol=market['id'],
|
||||
)
|
||||
mixin_market_params(self.name, params, market)
|
||||
|
||||
if asset_def is not None:
|
||||
params['symbol'] = asset_def['symbol']
|
||||
|
||||
params['start_date'] = asset_def['start_date'] \
|
||||
if 'start_date' in asset_def else None
|
||||
|
||||
params['end_date'] = asset_def['end_date'] \
|
||||
if 'end_date' in asset_def else None
|
||||
|
||||
params['leverage'] = asset_def['leverage'] \
|
||||
if 'leverage' in asset_def else 1.0
|
||||
|
||||
params['asset_name'] = asset_def['asset_name'] \
|
||||
if 'asset_name' in asset_def else None
|
||||
|
||||
params['end_daily'] = asset_def['end_daily'] \
|
||||
if 'end_daily' in asset_def \
|
||||
and asset_def['end_daily'] != 'N/A' else None
|
||||
|
||||
params['end_minute'] = asset_def['end_minute'] \
|
||||
if 'end_minute' in asset_def \
|
||||
and asset_def['end_minute'] != 'N/A' else None
|
||||
|
||||
else:
|
||||
params['symbol'] = self.get_catalyst_symbol(market)
|
||||
params['leverage'] = 1.0
|
||||
|
||||
return TradingPair(**params)
|
||||
|
||||
def load_assets(self):
|
||||
markets = self.api.fetch_markets()
|
||||
self.assets = []
|
||||
|
||||
for market in markets:
|
||||
asset, is_local = self._fetch_asset(market['id'])
|
||||
data_source = 'local' if is_local else 'catalyst'
|
||||
for market in self.markets:
|
||||
log.debug('fetching asset for market: {}'.format(market['id']))
|
||||
asset_defs = self.fetch_asset_defs(market)
|
||||
|
||||
params = dict(
|
||||
exchange=self.name,
|
||||
data_source=data_source,
|
||||
exchange_symbol=market['id'],
|
||||
)
|
||||
mixin_market_params(self.name, params, market)
|
||||
|
||||
if asset is not None:
|
||||
params['symbol'] = asset['symbol']
|
||||
|
||||
params['start_date'] = pd.to_datetime(
|
||||
asset['start_date'], utc=True
|
||||
) if 'start_date' in asset else None
|
||||
|
||||
params['end_date'] = pd.to_datetime(
|
||||
asset['end_date'], utc=True
|
||||
) if 'end_date' in asset else None
|
||||
|
||||
params['leverage'] = asset['leverage'] \
|
||||
if 'leverage' in asset else 1.0
|
||||
|
||||
params['asset_name'] = asset['asset_name'] \
|
||||
if 'asset_name' in asset else None
|
||||
|
||||
params['end_daily'] = pd.to_datetime(
|
||||
asset['end_daily'], utc=True
|
||||
) if 'end_daily' in asset and asset['end_daily'] != 'N/A' \
|
||||
else None
|
||||
|
||||
params['end_minute'] = pd.to_datetime(
|
||||
asset['end_minute'], utc=True
|
||||
) if 'end_minute' in asset and asset['end_minute'] != 'N/A' \
|
||||
else None
|
||||
|
||||
else:
|
||||
params['symbol'] = self.get_catalyst_symbol(market)
|
||||
|
||||
trading_pair = TradingPair(**params)
|
||||
self.assets[market['id']] = trading_pair
|
||||
for asset_def in asset_defs:
|
||||
if asset_def[0] is not None or not asset_defs[1]:
|
||||
try:
|
||||
asset = self.create_trading_pair(
|
||||
market=market,
|
||||
asset_def=asset_def[0],
|
||||
is_local=asset_def[1]
|
||||
)
|
||||
self.assets.append(asset)
|
||||
except TypeError as e:
|
||||
pass
|
||||
|
||||
def get_balances(self):
|
||||
try:
|
||||
@@ -293,7 +331,7 @@ class CCXT(Exchange):
|
||||
|
||||
order = Order(
|
||||
dt=date,
|
||||
asset=self.assets[symbol],
|
||||
asset=self.get_asset(symbol, is_exchange_symbol=True),
|
||||
amount=amount,
|
||||
stop=stop_price,
|
||||
limit=limit_price,
|
||||
|
||||
@@ -34,7 +34,8 @@ class Exchange:
|
||||
|
||||
def __init__(self):
|
||||
self.name = None
|
||||
self.assets = dict()
|
||||
self.assets = []
|
||||
self._symbol_maps = [None, None]
|
||||
self._portfolio = None
|
||||
self.minute_writer = None
|
||||
self.minute_reader = None
|
||||
@@ -145,9 +146,9 @@ class Exchange:
|
||||
"""
|
||||
symbol = None
|
||||
|
||||
for key in self.assets:
|
||||
if not symbol and self.assets[key].symbol == asset.symbol:
|
||||
symbol = key
|
||||
for a in self.assets:
|
||||
if not symbol and a.symbol == asset.symbol:
|
||||
symbol = a.symbol
|
||||
|
||||
if not symbol:
|
||||
raise ValueError('Currency %s not supported by exchange %s' %
|
||||
@@ -187,33 +188,32 @@ class Exchange:
|
||||
list[TradingPair]
|
||||
|
||||
"""
|
||||
assets = []
|
||||
|
||||
if symbols is not None:
|
||||
assets = []
|
||||
for symbol in symbols:
|
||||
asset = self.get_asset(symbol, data_frequency)
|
||||
assets.append(asset)
|
||||
return assets
|
||||
else:
|
||||
for key in self.assets:
|
||||
assets.append(self.assets[key])
|
||||
return self.assets
|
||||
|
||||
return assets
|
||||
|
||||
def _find_asset(self, asset, symbol, data_frequency, is_local=False):
|
||||
assets = self.assets
|
||||
for key in assets:
|
||||
def _find_asset(self, asset, symbol, data_frequency, is_exchange_symbol,
|
||||
is_local=False):
|
||||
for a in self.assets:
|
||||
has_data = (data_frequency == 'minute'
|
||||
and assets[key].end_minute is not None) \
|
||||
and a.end_minute is not None) \
|
||||
or (data_frequency == 'daily'
|
||||
and assets[key].end_daily is not None)
|
||||
and a.end_daily is not None)
|
||||
|
||||
if not asset and assets[key].symbol.lower() == symbol.lower() \
|
||||
symbol_attr = a.exchange_symbol if is_exchange_symbol else a.symbol
|
||||
if not asset and symbol_attr.lower() == symbol.lower() \
|
||||
and (not data_frequency or has_data):
|
||||
asset = assets[key]
|
||||
asset = a
|
||||
|
||||
return asset
|
||||
|
||||
def get_asset(self, symbol, data_frequency=None):
|
||||
def get_asset(self, symbol, data_frequency=None, is_exchange_symbol=False):
|
||||
"""
|
||||
The market for the specified symbol.
|
||||
|
||||
@@ -229,16 +229,19 @@ class Exchange:
|
||||
asset = None
|
||||
|
||||
log.debug('searching asset {} on the server'.format(symbol))
|
||||
asset = self._find_asset(asset, symbol, data_frequency, False)
|
||||
asset = self._find_asset(
|
||||
asset, symbol, data_frequency, is_exchange_symbol, False
|
||||
)
|
||||
|
||||
log.debug('asset {} not found on the server, searching local '
|
||||
'assets'.format(symbol))
|
||||
asset = self._find_asset(asset, symbol, data_frequency, True)
|
||||
asset = self._find_asset(
|
||||
asset, symbol, data_frequency, is_exchange_symbol, True
|
||||
)
|
||||
|
||||
if not asset:
|
||||
all_values = list(self.assets.values())
|
||||
supported_symbols = sorted([
|
||||
asset.symbol for asset in all_values
|
||||
asset.symbol for asset in self.assets
|
||||
])
|
||||
|
||||
raise SymbolNotFoundOnExchange(
|
||||
@@ -250,7 +253,14 @@ class Exchange:
|
||||
return asset
|
||||
|
||||
def fetch_symbol_map(self, is_local=False):
|
||||
return get_exchange_symbols(self.name, is_local)
|
||||
index = 1 if is_local else 0
|
||||
if self._symbol_maps[index] is not None:
|
||||
return self._symbol_maps[index]
|
||||
|
||||
else:
|
||||
symbol_map = get_exchange_symbols(self.name, is_local)
|
||||
self._symbol_maps[index] = symbol_map
|
||||
return symbol_map
|
||||
|
||||
@abstractmethod
|
||||
def load_assets(self, is_local=False):
|
||||
|
||||
@@ -31,7 +31,7 @@ from catalyst.exchange.exchange_errors import EmptyValuesInBundleError, \
|
||||
PricingDataNotLoadedError, DataCorruptionError, ExchangeSymbolsNotFound, \
|
||||
PricingDataValueError
|
||||
from catalyst.exchange.exchange_utils import get_exchange_folder, \
|
||||
get_exchange_symbols, save_exchange_symbols
|
||||
get_exchange_symbols, save_exchange_symbols, mixin_market_params
|
||||
from catalyst.utils.cli import maybe_show_progress
|
||||
from catalyst.utils.paths import ensure_directory
|
||||
|
||||
@@ -667,12 +667,11 @@ class ExchangeBundle:
|
||||
|
||||
"""
|
||||
log.info('ingesting csv file: {}'.format(path))
|
||||
try:
|
||||
symbols_def = get_exchange_symbols(
|
||||
self.exchange_name, is_local=True
|
||||
)
|
||||
except ExchangeSymbolsNotFound:
|
||||
symbols_def = dict()
|
||||
|
||||
if self.exchange is None:
|
||||
# Avoid circular dependencies
|
||||
from catalyst.exchange.factory import get_exchange
|
||||
self.exchange = get_exchange(self.exchange_name)
|
||||
|
||||
problems = []
|
||||
df = pd.read_csv(
|
||||
@@ -705,24 +704,40 @@ class ExchangeBundle:
|
||||
end_dt = df.index.get_level_values(1).max()
|
||||
end_dt_key = 'end_{}'.format(data_frequency)
|
||||
|
||||
if symbol is symbols_def:
|
||||
symbol_def = symbols_def[symbol]
|
||||
market = self.exchange.get_market(symbol)
|
||||
if market is None:
|
||||
raise ValueError('symbol not available in the exchange.')
|
||||
|
||||
start_dt = symbol_def['start_date'] \
|
||||
if symbol_def['start_date'] < start_dt else start_dt
|
||||
params = dict(
|
||||
exchange=self.exchange.name,
|
||||
data_source='local',
|
||||
exchange_symbol=market['id'],
|
||||
)
|
||||
mixin_market_params(self.exchange_name, params, market)
|
||||
|
||||
end_dt = symbol_def[end_dt_key] \
|
||||
if symbol_def[end_dt_key] > end_dt else end_dt
|
||||
asset_def = self.exchange.fetch_asset_def(market, True)
|
||||
if asset_def is not None:
|
||||
params['symbol'] = asset_def['symbol']
|
||||
|
||||
end_daily = end_dt \
|
||||
if data_frequency == 'daily' else symbol_def['end_daily']
|
||||
params['start_date'] = asset_def['start_date'] \
|
||||
if asset_def['start_date'] < start_dt else start_dt
|
||||
|
||||
end_minute = end_dt \
|
||||
if data_frequency == 'minute' else symbol_def['end_minute']
|
||||
params['end_date'] = asset_def[end_dt_key] \
|
||||
if asset_def[end_dt_key] > end_dt else end_dt
|
||||
|
||||
params['end_daily'] = end_dt \
|
||||
if data_frequency == 'daily' else asset_def['end_daily']
|
||||
|
||||
params['end_minute'] = end_dt \
|
||||
if data_frequency == 'minute' else asset_def['end_minute']
|
||||
|
||||
else:
|
||||
end_daily = end_dt if data_frequency == 'daily' else 'N/A'
|
||||
end_minute = end_dt if data_frequency == 'minute' else 'N/A'
|
||||
params['symbol'] = self.exchange.get_catalyst_symbol(market)
|
||||
|
||||
params['end_daily'] = end_dt \
|
||||
if data_frequency == 'daily' else 'N/A'
|
||||
params['end_minute'] = end_dt \
|
||||
if data_frequency == 'minute' else 'N/A'
|
||||
|
||||
if min_start_dt is None or start_dt < min_start_dt:
|
||||
min_start_dt = start_dt
|
||||
@@ -730,19 +745,8 @@ class ExchangeBundle:
|
||||
if max_end_dt is None or end_dt > max_end_dt:
|
||||
max_end_dt = end_dt
|
||||
|
||||
asset = TradingPair(
|
||||
symbol=symbol,
|
||||
exchange=self.exchange_name,
|
||||
start_date=start_dt,
|
||||
end_date=end_dt,
|
||||
leverage=0, # TODO: add as an optional column
|
||||
asset_name=symbol,
|
||||
min_trade_size=0, # TODO: add as an optional column
|
||||
end_daily=end_daily,
|
||||
end_minute=end_minute,
|
||||
exchange_symbol=symbol
|
||||
)
|
||||
assets[symbol] = asset
|
||||
asset = TradingPair(**params)
|
||||
assets[market['id']] = asset
|
||||
|
||||
save_exchange_symbols(self.exchange_name, assets, True)
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ from datetime import date, datetime
|
||||
|
||||
import pandas as pd
|
||||
from catalyst.assets._assets import TradingPair
|
||||
from six import string_types
|
||||
from six.moves.urllib import request
|
||||
|
||||
from catalyst.constants import DATE_FORMAT, SYMBOLS_URL
|
||||
@@ -100,6 +101,20 @@ def download_exchange_symbols(exchange_name, environ=None):
|
||||
return response
|
||||
|
||||
|
||||
def symbols_parser(asset_def):
|
||||
for key, value in asset_def.items():
|
||||
match = isinstance(value, string_types) \
|
||||
and re.search(r'(\d{4}-\d{2}-\d{2})', value)
|
||||
|
||||
if match:
|
||||
try:
|
||||
asset_def[key] = pd.to_datetime(value, utc=True)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return asset_def
|
||||
|
||||
|
||||
def get_exchange_symbols(exchange_name, is_local=False, environ=None):
|
||||
"""
|
||||
The de-serialized content of the exchange's symbols.json.
|
||||
@@ -125,10 +140,10 @@ def get_exchange_symbols(exchange_name, is_local=False, environ=None):
|
||||
if os.path.isfile(filename):
|
||||
with open(filename) as data_file:
|
||||
try:
|
||||
data = json.load(data_file)
|
||||
data = json.load(data_file, object_hook=symbols_parser)
|
||||
return data
|
||||
|
||||
except ValueError:
|
||||
except ValueError as e:
|
||||
return dict()
|
||||
else:
|
||||
raise ExchangeSymbolsNotFound(
|
||||
|
||||
Reference in New Issue
Block a user