diff --git a/.gitignore b/.gitignore index 7e2c83e8..ff13509b 100644 --- a/.gitignore +++ b/.gitignore @@ -78,3 +78,7 @@ zipline.iml ./data TAGS + +python2 +python3 +scratch diff --git a/catalyst/__main__.py b/catalyst/__main__.py index 7dfe91b1..36593bab 100644 --- a/catalyst/__main__.py +++ b/catalyst/__main__.py @@ -28,9 +28,9 @@ except NameError: '--strict-extensions/--non-strict-extensions', is_flag=True, help='If --strict-extensions is passed then catalyst will not run if it' - ' cannot load all of the specified extensions. If this is not passed or' - ' --non-strict-extensions is passed then the failure will be logged but' - ' execution will continue.', + ' cannot load all of the specified extensions. If this is not passed or' + ' --non-strict-extensions is passed then the failure will be logged but' + ' execution will continue.', ) @click.option( '--default-extension/--no-default-extension', @@ -64,6 +64,7 @@ def extract_option_object(option): option_object : click.Option The option object that this decorator will create. """ + @option def opt(): pass @@ -95,7 +96,9 @@ def ipython_only(option): def _(*args, **kwargs): kwargs[argname] = None return f(*args, **kwargs) + return _ + return d @@ -117,9 +120,9 @@ def ipython_only(option): '--define', multiple=True, help="Define a name to be bound in the namespace before executing" - " the algotext. For example '-Dname=value'. The value may be any python" - " expression. These are evaluated in order so they may refer to previously" - " defined names.", + " the algotext. For example '-Dname=value'. The value may be any python" + " expression. These are evaluated in order so they may refer to previously" + " defined names.", ) @click.option( '--data-frequency', @@ -149,7 +152,7 @@ def ipython_only(option): default=pd.Timestamp.utcnow(), show_default=False, help='The date to lookup data on or before.\n' - '[default: ]' + '[default: ]' ) @click.option( '-s', @@ -170,7 +173,7 @@ def ipython_only(option): metavar='FILENAME', show_default=True, help="The location to write the perf data. If this is '-' the perf will" - " be written to stdout.", + " be written to stdout.", ) @click.option( '--print-algo/--no-print-algo', @@ -184,6 +187,29 @@ def ipython_only(option): default=None, help='Should the algorithm methods be resolved in the local namespace.' )) +@click.option( + '--live/--no-live', + is_flag=True, + default=False, + help='Enable live trading.', +) +@click.option( + '-x', + '--exchange-name', + type=click.Choice({'bitfinex', 'bittrex'}), + help='The name of the targeted exchange (supported: bitfinex, bittrex).', +) +@click.option( + '-n', + '--algo-namespace', + help='A label assigned to the algorithm for data storage purposes.' +) +@click.option( + '-c', + '--base-currency', + help='The base currency used to calculate statistics ' + '(e.g. usd, btc, eth).', +) @click.pass_context def run(ctx, algofile, @@ -197,21 +223,37 @@ def run(ctx, end, output, print_algo, - local_namespace): + local_namespace, + live, + exchange_name, + algo_namespace, + base_currency): """Run a backtest for the given algorithm. """ - # check that the start and end dates are passed correctly - if start is None and end is None: - # check both at the same time to avoid the case where a user - # does not pass either of these and then passes the first only - # to be told they need to pass the second argument also - ctx.fail( - "must specify dates with '-s' / '--start' and '-e' / '--end'", - ) - if start is None: - ctx.fail("must specify a start date with '-s' / '--start'") - if end is None: - ctx.fail("must specify an end date with '-e' / '--end'") + + if live: + if exchange_name is None: + ctx.fail("must specify an exchange name '-x' in live execution " + "mode '--live'") + if algo_namespace is None: + ctx.fail("must specify an algorithm name '-n' in live execution " + "mode '--live'") + if base_currency is None: + ctx.fail("must specify a base currency '-c' in live " + "execution mode '--live'") + else: + # check that the start and end dates are passed correctly + if start is None and end is None: + # check both at the same time to avoid the case where a user + # does not pass either of these and then passes the first only + # to be told they need to pass the second argument also + ctx.fail( + "must specify dates with '-s' / '--start' and '-e' / '--end'", + ) + if start is None: + ctx.fail("must specify a start date with '-s' / '--start'") + if end is None: + ctx.fail("must specify an end date with '-e' / '--end'") if (algotext is not None) == (algofile is not None): ctx.fail( @@ -238,6 +280,10 @@ def run(ctx, print_algo=print_algo, local_namespace=local_namespace, environ=os.environ, + live=live, + exchange=exchange_name, + algo_namespace=algo_namespace, + base_currency=base_currency ) if output == '-': @@ -265,11 +311,11 @@ def catalyst_magic(line, cell=None): '--algotext', cell, '--output', os.devnull, # don't write the results by default ] + ([ - # these options are set when running in line magic mode - # set a non None algo text to use the ipython user_ns - '--algotext', '', - '--local-namespace', - ] if cell is None else []) + line.split(), + # these options are set when running in line magic mode + # set a non None algo text to use the ipython user_ns + '--algotext', '', + '--local-namespace', + ] if cell is None else []) + line.split(), '%s%%catalyst' % ((cell or '') and '%'), # don't use system exit and propogate errors to the caller standalone_mode=False, @@ -335,14 +381,14 @@ def ingest(bundle, compile_locally, assets_version, show_progress): '--before', type=Timestamp(), help='Clear all data before TIMESTAMP.' - ' This may not be passed with -k / --keep-last', + ' This may not be passed with -k / --keep-last', ) @click.option( '-a', '--after', type=Timestamp(), help='Clear all data after TIMESTAMP' - ' This may not be passed with -k / --keep-last', + ' This may not be passed with -k / --keep-last', ) @click.option( '-k', @@ -350,7 +396,7 @@ def ingest(bundle, compile_locally, assets_version, show_progress): type=int, metavar='N', help='Clear all but the last N downloads.' - ' This may not be passed with -e / --before or -a / --after', + ' This may not be passed with -e / --before or -a / --after', ) def clean(bundle, before, after, keep_last): """Clean up data downloaded with the ingest command. diff --git a/catalyst/assets/_assets.pyx b/catalyst/assets/_assets.pyx index a06e4239..cdc34008 100644 --- a/catalyst/assets/_assets.pyx +++ b/catalyst/assets/_assets.pyx @@ -20,31 +20,31 @@ Cythonized Asset object. cimport cython from cpython.number cimport PyNumber_Index from cpython.object cimport ( - Py_EQ, - Py_NE, - Py_GE, - Py_LE, - Py_GT, - Py_LT, +Py_EQ, +Py_NE, +Py_GE, +Py_LE, +Py_GT, +Py_LT, ) from cpython cimport bool +import pandas as pd +from datetime import timedelta import numpy as np from numpy cimport int64_t import warnings cimport numpy as np from catalyst.utils.calendars import get_calendar - +from catalyst.exchange.exchange_errors import InvalidSymbolError, SidHashError # IMPORTANT NOTE: You must change this template if you change # Asset.__reduce__, or else we'll attempt to unpickle an old version of this # class CACHE_FILE_TEMPLATE = '/tmp/.%s-%s.v7.cache' - cdef class Asset: - cdef readonly int sid # Cached hash of self.sid cdef int sid_hash @@ -73,8 +73,8 @@ cdef class Asset: }) def __init__(self, - int sid, # sid is required - object exchange, # exchange is required + int sid, # sid is required + object exchange, # exchange is required object symbol="", object asset_name="", object start_date=None, @@ -230,9 +230,7 @@ cdef class Asset: calendar = get_calendar(self.exchange) return calendar.is_open_on_minute(dt_minute) - cdef class Equity(Asset): - def __repr__(self): attrs = ('symbol', 'asset_name', 'exchange', 'start_date', 'end_date', 'first_traded', 'auto_close_date', @@ -250,8 +248,8 @@ cdef class Equity(Asset): """ def __get__(self): warnings.warn("The security_start_date property will soon be " - "retired. Please use the start_date property instead.", - DeprecationWarning) + "retired. Please use the start_date property instead.", + DeprecationWarning) return self.start_date property security_end_date: @@ -261,8 +259,8 @@ cdef class Equity(Asset): """ def __get__(self): warnings.warn("The security_end_date property will soon be " - "retired. Please use the end_date property instead.", - DeprecationWarning) + "retired. Please use the end_date property instead.", + DeprecationWarning) return self.end_date property security_name: @@ -272,13 +270,11 @@ cdef class Equity(Asset): """ def __get__(self): warnings.warn("The security_name property will soon be " - "retired. Please use the asset_name property instead.", - DeprecationWarning) + "retired. Please use the asset_name property instead.", + DeprecationWarning) return self.asset_name - cdef class Future(Asset): - cdef readonly object root_symbol cdef readonly object notice_date cdef readonly object expiration_date @@ -303,8 +299,8 @@ cdef class Future(Asset): }) def __init__(self, - int sid, # sid is required - object exchange, # exchange is required + int sid, # sid is required + object exchange, # exchange is required object symbol="", object root_symbol="", object asset_name="", @@ -388,6 +384,160 @@ cdef class Future(Asset): super_dict['multiplier'] = self.multiplier return super_dict +cdef class TradingPair(Asset): + cdef readonly float leverage + cdef readonly object market_currency + cdef readonly object base_currency + + _kwargnames = frozenset({ + 'sid', + 'symbol', + 'asset_name', + 'start_date', + 'end_date', + 'first_traded', + 'auto_close_date', + 'exchange', + 'exchange_full', + 'leverage', + 'market_currency', + 'base_currency' + }) + def __init__(self, + object symbol, + object exchange, + object start_date=None, + object asset_name=None, + int sid=0, + float leverage=1.0, + object end_date=None, + object first_traded=None, + object auto_close_date=None, + object exchange_full=None): + """ + Replicates the Asset constructor with some built-in conventions + and a new 'leverage' attribute. + + Symbol + ------ + Catalyst defines its own set of "universal" symbols to reference + trading pairs across exchanges. This is required because exchanges + are not adhering to a universal symbolism. For example, Bitfinex + uses the BTC symbol for Bitcon while Kraken uses XBT. In addition, + pairs are sometimes presented differently. For example, Bitfinex + puts the market currency before the base currency without a + separator, Bittrex puts the base currency first and uses a dash + seperator. + + Here is the Catalyst convention: [Market Currency]_[Base Currency] + For example: btc_usd, eth_btc, neo_eth, ltc_eur. + + The symbol for each currency (e.g. btc, eth, ltc) is generally + aligned with the Bittrex exchange. + + Sid + --- + The sid of each asset is calculated based on a numeric hash of the + universal symbol. This simple approach avoids maintaining a mapping + of sids. + + Leverage + -------- + In contrast with equities, crypto exchanges generally assign + leverage values to specific trading pairs. Pairs with the + highest volume and market cap generally benefit from high leverage. + New currencies from ICO generally cannot be leveraged. + + The leverage value is either None or and integer. + + Leverage allows you to open a larger position with a smaller amount + of funds. For example, if you open a $5,000 position in BTC/USD + with 5:1 leverage, only one-fifth of this amount, or $1000, will be + tied to the position from your balance. Your remaining balance will + be available for opening more positions. If you open this same + position with 2:1 leverage, $2,500 of your balance will be tied to + the position. If you open with 1:1 leverage, $5,000 of your balance + will be tied to the position. + + :param symbol: + :param exchange: + :param start_date: + :param asset_name: + :param sid: + :param leverage: + :param end_date: + :param first_traded: + :param auto_close_date: + :param exchange_full: + """ + + symbol = symbol.lower() + try: + self.market_currency, self.base_currency = symbol.split('_') + except Exception as e: + raise InvalidSymbolError(symbol=symbol, error=e) + + if sid == 0 or sid is None: + try: + sid = abs(hash(symbol)) % (10 ** 4) + except Exception as e: + raise SidHashError(symbol=symbol) + + if asset_name is None: + asset_name = ' / '.join(symbol.split('_')).upper() + + if start_date is None: + start_date = pd.Timestamp.utcnow() + + if end_date is None: + end_date = pd.Timestamp.utcnow() + timedelta(days=365) + + super().__init__( + sid, + exchange, + symbol=symbol, + asset_name=asset_name, + start_date=start_date, + end_date=end_date, + first_traded=first_traded, + auto_close_date=auto_close_date, + exchange_full=exchange_full, + ) + + self.leverage = leverage + + def __repr__(self): + return 'Trading Pair {symbol}({sid}) Exchange: {exchange}, ' \ + 'Introduced On: {start_date}, ' \ + 'Market Currency: {market_currency}, ' \ + 'Base Currency: {base_currency}, ' \ + 'Exchange Leverage: {leverage}'.format( + symbol=self.symbol, + sid=self.sid, + exchange=self.exchange, + start_date=self.start_date, + market_currency=self.market_currency, + base_currency=self.base_currency, + leverage=self.leverage + ) + + cpdef __reduce__(self): + """ + Function used by pickle to determine how to serialize/deserialize this + class. Should return a tuple whose first element is self.__class__, + and whose second element is a tuple of all the attributes that should + be serialized/deserialized during pickling. + """ + return (self.__class__, (self.symbol, + self.exchange, + self.start_date, + self.asset_name, + self.sid, + self.leverage, + self.end_date, + self.first_traded, + self.auto_close_date, + self.exchange_full)) def make_asset_array(int size, Asset asset): cdef np.ndarray out = np.empty([size], dtype=object) diff --git a/catalyst/examples/buy_low_sell_high.py b/catalyst/examples/buy_low_sell_high.py new file mode 100644 index 00000000..e1459ee6 --- /dev/null +++ b/catalyst/examples/buy_low_sell_high.py @@ -0,0 +1,155 @@ +''' +This algorithm requires an additional library (ta-lib) beyond those required by catalyst. +Install it first by running: +$ pip install TA-Lib + +If you get build errors like "fatal error: ta-lib/ta_libc.h: No such file or directory" +it typically means that it can't find the underlying TA-Lib library and needs to be installed. +See https://mrjbq7.github.io/ta-lib/install.html for instructions on how to install +the required dependencies. +''' + +import talib +from logbook import Logger + +from catalyst.api import ( + order, + order_target_percent, + symbol, + record, + get_open_orders, +) +from catalyst.exchange.stats_utils import get_pretty_stats + +algo_namespace = 'buy_low_sell_high_xrp' +log = Logger(algo_namespace) + + +def initialize(context): + log.info('initializing algo') + context.ASSET_NAME = 'XRP_USD' + context.asset = symbol(context.ASSET_NAME) + + context.TARGET_POSITIONS = 5000 + context.PROFIT_TARGET = 0.1 + context.SLIPPAGE_ALLOWED = 0.05 + + context.retry_check_open_orders = 10 + context.retry_update_portfolio = 10 + context.retry_order = 5 + + context.errors = [] + pass + + +def _handle_data(context, data): + prices = data.history( + context.asset, + fields='price', + bar_count=20, + frequency='15m' + ) + rsi = talib.RSI(prices.values, timeperiod=14)[-1] + log.info('got rsi: {}'.format(rsi)) + + # Buying more when RSI is low, this should lower our cost basis + if rsi <= 30: + buy_increment = 50 + elif rsi <= 40: + buy_increment = 20 + elif rsi <= 70: + buy_increment = 5 + else: + buy_increment = None + + cash = context.portfolio.cash + log.info('base currency available: {cash}'.format(cash=cash)) + + price = data.current(context.asset, 'price') + log.info('got price {price}'.format(price=price)) + + record( + price=price, + rsi=rsi, + ) + + orders = get_open_orders(context.asset) + if orders: + log.info('skipping bar until all open orders execute') + return + + is_buy = False + cost_basis = None + if context.asset in context.portfolio.positions: + position = context.portfolio.positions[context.asset] + + cost_basis = position.cost_basis + log.info( + 'found {amount} positions with cost basis {cost_basis}'.format( + amount=position.amount, + cost_basis=cost_basis + ) + ) + + if position.amount >= context.TARGET_POSITIONS: + log.info('reached positions target: {}'.format(position.amount)) + return + + if price < cost_basis: + is_buy = True + elif position.amount > 0 and \ + price > cost_basis * (1 + context.PROFIT_TARGET): + profit = (price * position.amount) - (cost_basis * position.amount) + log.info('closing position, taking profit: {}'.format(profit)) + order_target_percent( + asset=context.asset, + target=0, + limit_price=price * (1 - context.SLIPPAGE_ALLOWED), + ) + else: + log.info('no buy or sell opportunity found') + else: + is_buy = True + + if is_buy: + if buy_increment is None: + log.info('the rsi is too high to consider buying {}'.format(rsi)) + return + + if price * buy_increment > cash: + log.info('not enough base currency to consider buying') + return + + log.info( + 'buying position cheaper than cost basis {} < {}'.format( + price, + cost_basis + ) + ) + order( + asset=context.asset, + amount=buy_increment, + limit_price=price * (1 + context.SLIPPAGE_ALLOWED) + ) + + +def handle_data(context, data): + log.info('handling bar {}'.format(data.current_dt)) + # try: + _handle_data(context, data) + # except Exception as e: + # log.warn('aborting the bar on error {}'.format(e)) + # context.errors.append(e) + + log.info('completed bar {}, total execution errors {}'.format( + data.current_dt, + len(context.errors) + )) + + if len(context.errors) > 0: + log.info('the errors:\n{}'.format(context.errors)) + + +def analyze(context, stats): + log.info('the daily stats:\n{}'.format(get_pretty_stats(stats))) + pass diff --git a/catalyst/exchange/__init__.py b/catalyst/exchange/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/catalyst/exchange/algorithm_exchange.py b/catalyst/exchange/algorithm_exchange.py new file mode 100644 index 00000000..4fcc2517 --- /dev/null +++ b/catalyst/exchange/algorithm_exchange.py @@ -0,0 +1,452 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import signal +import sys +import pickle +from datetime import timedelta +from time import sleep +from os import listdir +from os.path import isfile, join +from collections import deque + +import logbook +import pandas as pd + +import catalyst.protocol as zp +from catalyst.algorithm import TradingAlgorithm +from catalyst.data.minute_bars import BcolzMinuteBarWriter, \ + BcolzMinuteBarReader +from catalyst.errors import OrderInBeforeTradingStart +from catalyst.exchange.exchange_clock import ExchangeClock +from catalyst.exchange.exchange_errors import ( + ExchangeRequestError, + ExchangePortfolioDataError, + ExchangeTransactionError +) +from catalyst.exchange.exchange_utils import get_exchange_minute_writer_root, \ + save_algo_object, get_algo_object, get_algo_folder +from catalyst.exchange.stats_utils import get_pretty_stats +from catalyst.finance.performance.period import calc_period_stats +from catalyst.gens.tradesimulation import AlgorithmSimulator +from catalyst.utils.api_support import ( + api_method, + disallowed_in_before_trading_start) +from catalyst.utils.input_validation import error_keywords + +log = logbook.Logger("ExchangeTradingAlgorithm") + + +class ExchangeAlgorithmExecutor(AlgorithmSimulator): + def __init__(self, *args, **kwargs): + super(self.__class__, self).__init__(*args, **kwargs) + + +class ExchangeTradingAlgorithm(TradingAlgorithm): + def __init__(self, *args, **kwargs): + self.exchange = kwargs.pop('exchange', None) + self.algo_namespace = kwargs.pop('algo_namespace', None) + self.orders = {} + self.minute_stats = deque(maxlen=60) + self.is_running = True + + self.retry_check_open_orders = 5 + self.retry_synchronize_portfolio = 5 + self.retry_get_open_orders = 5 + self.retry_order = 2 + self.retry_delay = 5 + + self.stats_minutes = 5 + + super(self.__class__, self).__init__(*args, **kwargs) + # self._create_minute_writer() + + signal.signal(signal.SIGINT, self.signal_handler) + + log.info('exchange trading algorithm successfully initialized') + + def _create_minute_writer(self): + root = get_exchange_minute_writer_root(self.exchange.name) + filename = os.path.join(root, 'metadata.json') + + if os.path.isfile(filename): + writer = BcolzMinuteBarWriter.open( + root, self.sim_params.end_session) + else: + writer = BcolzMinuteBarWriter( + rootdir=root, + calendar=self.trading_calendar, + minutes_per_day=1440, + start_session=self.sim_params.start_session, + end_session=self.sim_params.end_session, + write_metadata=True + ) + + self.exchange.minute_writer = writer + self.exchange.minute_reader = BcolzMinuteBarReader(root) + + def signal_handler(self, signal, frame): + self.is_running = False + + if self._analyze is None: + log.info('Interruption signal detected {}, exiting the ' + 'algorithm'.format(signal)) + + else: + log.info('Interruption signal detected {}, calling `analyze()` ' + 'before exiting the algorithm'.format(signal)) + + algo_folder = get_algo_folder(self.algo_namespace) + folder = join(algo_folder, 'daily_perf') + files = [f for f in listdir(folder) if isfile(join(folder, f))] + + daily_perf_list = [] + for item in files: + filename = join(folder, item) + with open(filename, 'rb') as handle: + daily_perf_list.append(pickle.load(handle)) + + stats = pd.DataFrame(daily_perf_list) + + self.analyze(stats) + + sys.exit(0) + + def _create_clock(self): + + # The calendar's execution times are the minutes over which we actually + # want to run the clock. Typically the execution times simply adhere to + # the market open and close times. In the case of the futures calendar, + # for example, we only want to simulate over a subset of the full 24 + # hour calendar, so the execution times dictate a market open time of + # 6:31am US/Eastern and a close of 5:00pm US/Eastern. + + # In our case, we are trading around the clock, so the market close + # corresponds to the last minute of the day. + + # This method is taken from TradingAlgorithm. + # The clock has been replaced to use RealtimeClock + # TODO: should we apply a time skew? not sure to understand the utility. + return ExchangeClock( + self.sim_params.sessions, + time_skew=self.exchange.time_skew + ) + + def _create_generator(self, sim_params): + if self.perf_tracker is None: + self.perf_tracker = get_algo_object( + algo_name=self.algo_namespace, + key='perf_tracker' + ) + + # Call the simulation trading algorithm for side-effects: + # it creates the perf tracker + TradingAlgorithm._create_generator(self, sim_params) + self.trading_client = ExchangeAlgorithmExecutor( + self, + sim_params, + self.data_portal, + self._create_clock(), + self._create_benchmark_source(), + self.restrictions, + universe_func=self._calculate_universe + ) + + return self.trading_client.transform() + + def updated_portfolio(self): + """ + We skip the entire performance tracker business and update the + portfolio directly. + :return: + """ + return self.exchange.portfolio + + def updated_account(self): + return self.exchange.account + + def _synchronize_portfolio(self, attempt_index=0): + try: + self.exchange.synchronize_portfolio() + + # Applying the updated last_sales_price to the positions + # in the performance tracker. This seems a bit redundant + # but it will make sense when we have multiple exchange portfolios + # feeding into the same performance tracker. + tracker = self.perf_tracker.todays_performance.position_tracker + for asset in self.exchange.portfolio.positions: + position = self.exchange.portfolio.positions[asset] + tracker.update_position( + asset=asset, + last_sale_date=position.last_sale_date, + last_sale_price=position.last_sale_price + ) + except ExchangeRequestError as e: + log.warn( + 'update portfolio attempt {}: {}'.format(attempt_index, e) + ) + if attempt_index < self.retry_synchronize_portfolio: + sleep(self.retry_delay) + self._synchronize_portfolio(attempt_index + 1) + else: + raise ExchangePortfolioDataError( + data_type='update-portfolio', + attempts=attempt_index, + error=e + ) + + def _check_open_orders(self, attempt_index=0): + try: + return self.exchange.check_open_orders() + except ExchangeRequestError as e: + log.warn( + 'check open orders attempt {}: {}'.format(attempt_index, e) + ) + if attempt_index < self.retry_check_open_orders: + sleep(self.retry_delay) + return self._check_open_orders(attempt_index + 1) + else: + raise ExchangePortfolioDataError( + data_type='order-status', + attempts=attempt_index, + error=e + ) + + def prepare_period_stats(self, start_dt, end_dt): + """ + Creates a dictionary representing the state of the tracker. + + + I rewrote this in an attempt to better control the stats. + I don't want things to happen magically through complex logic + pertaining to backtesting. + + """ + tracker = self.perf_tracker + period = tracker.todays_performance + + pos_stats = period.position_tracker.stats() + period_stats = calc_period_stats(pos_stats, period.ending_cash) + + stats = dict( + period_start=tracker.period_start, + period_end=tracker.period_end, + capital_base=tracker.capital_base, + progress=tracker.progress, + ending_value=period.ending_value, + ending_exposure=period.ending_exposure, + capital_used=period.cash_flow, + starting_value=period.starting_value, + starting_exposure=period.starting_exposure, + starting_cash=period.starting_cash, + ending_cash=period.ending_cash, + portfolio_value=period.ending_cash + period.ending_value, + pnl=period.pnl, + returns=period.returns, + period_open=period.period_open, + period_close=period.period_close, + gross_leverage=period_stats.gross_leverage, + net_leverage=period_stats.net_leverage, + short_exposure=pos_stats.short_exposure, + long_exposure=pos_stats.long_exposure, + short_value=pos_stats.short_value, + long_value=pos_stats.long_value, + longs_count=pos_stats.longs_count, + shorts_count=pos_stats.shorts_count, + ) + + # Merging cumulative risk + stats.update(tracker.cumulative_risk_metrics.to_dict()) + + # Merging latest recorded variables + stats.update(self.recorded_vars) + + stats['positions'] = period.position_tracker.get_positions_list() + + # we want the key to be absent, not just empty + # Only include transactions for given dt + stats['transactions'] = dict() + for date in period.processed_transactions: + if start_dt <= date < end_dt: + stats['transactions'][date] = \ + period.processed_transactions[date] + + stats['orders'] = dict() + for date in period.orders_by_modified: + if start_dt <= date < end_dt: + stats['orders'][date] = \ + period.orders_by_modified[date] + + return stats + + def handle_data(self, data): + if not self.is_running: + return + + self._synchronize_portfolio() + + transactions = self._check_open_orders() + for transaction in transactions: + self.perf_tracker.process_transaction(transaction) + + if self._handle_data: + self._handle_data(self, data) + + # Unlike trading controls which remain constant unless placing an + # order, account controls can change each bar. Thus, must check + # every bar no matter if the algorithm places an order or not. + self.validate_account_controls() + + try: + # Since the clock runs 24/7, I trying to disable the daily + # Performance tracker and keep only minute and cumulative + self.perf_tracker.update_performance() + + minute_stats = self.prepare_period_stats( + data.current_dt, data.current_dt + timedelta(minutes=1)) + # Saving the last hour in memory + self.minute_stats.append(minute_stats) + + print_df = pd.DataFrame(list(self.minute_stats)) + log.debug( + 'statistics for the last {stats_minutes} minutes:\n{stats}'.format( + stats_minutes=self.stats_minutes, + stats=get_pretty_stats(print_df, self.stats_minutes) + )) + + today = pd.to_datetime('today', utc=True) + daily_stats = self.prepare_period_stats( + start_dt=today, + end_dt=pd.Timestamp.utcnow() + ) + save_algo_object( + algo_name=self.algo_namespace, + key=today.strftime('%Y-%m-%d'), + obj=daily_stats, + rel_path='daily_perf' + ) + + except Exception as e: + log.warn('unable to calculate performance: {}'.format(e)) + + try: + save_algo_object( + algo_name=self.algo_namespace, + key='perf_tracker', + obj=self.perf_tracker + ) + except Exception as e: + log.warn('unable to save minute perfs to disk: {}'.format(e)) + + try: + save_algo_object( + algo_name=self.algo_namespace, + key='portfolio_{}'.format(self.exchange.name), + obj=self.exchange.portfolio + ) + except Exception as e: + log.warn('unable to save portfolio to disk: {}'.format(e)) + + def _order(self, + asset, + amount, + limit_price=None, + stop_price=None, + style=None, + attempt_index=0): + try: + return self.exchange.order(asset, amount, limit_price, + stop_price, + style) + except ExchangeRequestError as e: + log.warn( + 'order attempt {}: {}'.format(attempt_index, e) + ) + if attempt_index < self.retry_order: + sleep(self.retry_delay) + return self._order( + asset, amount, limit_price, stop_price, style, + attempt_index + 1) + else: + raise ExchangeTransactionError( + transaction_type='order', + attempts=attempt_index, + error=e + ) + + @api_method + @disallowed_in_before_trading_start(OrderInBeforeTradingStart()) + def order(self, + asset, + amount, + limit_price=None, + stop_price=None, + style=None): + amount, style = self._calculate_order(asset, amount, + limit_price, stop_price, + style) + + order_id = self._order(asset, amount, limit_price, stop_price, style) + + if order_id is not None: + order = self.portfolio.open_orders[order_id] + self.perf_tracker.process_order(order) + + return order + + def round_order(self, amount): + """ + We need fractions with cryptocurrencies + + :param amount: + :return: + """ + return amount + + @api_method + def batch_market_order(self, share_counts): + raise NotImplementedError() + + def _get_open_orders(self, asset=None, attempt_index=0): + try: + return self.exchange.get_open_orders(asset) + except ExchangeRequestError as e: + log.warn( + 'open orders attempt {}: {}'.format(attempt_index, e) + ) + if attempt_index < self.retry_get_open_orders: + sleep(self.retry_delay) + return self._get_open_orders(asset, attempt_index + 1) + else: + raise ExchangePortfolioDataError( + data_type='open-orders', + attempts=attempt_index, + error=e + ) + + @error_keywords(sid='Keyword argument `sid` is no longer supported for ' + 'get_open_orders. Use `asset` instead.') + @api_method + def get_open_orders(self, asset=None): + return self._get_open_orders(asset) + + @api_method + def get_order(self, order_id): + return self.exchange.get_order(order_id) + + @api_method + def cancel_order(self, order_param): + order_id = order_param + if isinstance(order_param, zp.Order): + order_id = order_param.id + self.exchange.cancel_order(order_id) diff --git a/catalyst/exchange/asset_finder_exchange.py b/catalyst/exchange/asset_finder_exchange.py new file mode 100644 index 00000000..2239f8a8 --- /dev/null +++ b/catalyst/exchange/asset_finder_exchange.py @@ -0,0 +1,91 @@ +from logbook import Logger + +log = Logger('AssetFinderExchange') + + +class AssetFinderExchange(object): + def __init__(self, exchange): + self.exchange = exchange + self._asset_cache = {} + + @property + def sids(self): + """ + This seems to be used to pre-fetch assets. + I don't think that we need this for live-trading. + Leaving the list empty. + """ + return list() + + def retrieve_all(self, sids, default_none=False): + """ + Retrieve all assets in `sids`. + + Parameters + ---------- + sids : iterable of int + Assets to retrieve. + default_none : bool + If True, return None for failed lookups. + If False, raise `SidsNotFound`. + + Returns + ------- + assets : list[Asset or None] + A list of the same length as `sids` containing Assets (or Nones) + corresponding to the requested sids. + + Raises + ------ + SidsNotFound + When a requested sid is not found and default_none=False. + """ + for sid in sids: + if sid in self._asset_cache: + log.info('got asset from cache: {}'.format(sid)) + else: + log.info('fetching asset: {}'.format(sid)) + return list() + + def lookup_symbol(self, symbol, as_of_date, fuzzy=False): + """Lookup an asset by symbol. + + Parameters + ---------- + symbol : str + The ticker symbol to resolve. + as_of_date : datetime or None + Look up the last owner of this symbol as of this datetime. + If ``as_of_date`` is None, then this can only resolve the equity + if exactly one equity has ever owned the ticker. + fuzzy : bool, optional + Should fuzzy symbol matching be used? Fuzzy symbol matching + attempts to resolve differences in representations for + shareclasses. For example, some people may represent the ``A`` + shareclass of ``BRK`` as ``BRK.A``, where others could write + ``BRK_A``. + + Returns + ------- + equity : Asset + The equity that held ``symbol`` on the given ``as_of_date``, or the + only equity to hold ``symbol`` if ``as_of_date`` is None. + + Raises + ------ + SymbolNotFound + Raised when no equity has ever held the given symbol. + MultipleSymbolsFound + Raised when no ``as_of_date`` is given and more than one equity + has held ``symbol``. This is also raised when ``fuzzy=True`` and + there are multiple candidates for the given ``symbol`` on the + ``as_of_date``. + """ + log.debug('looking up symbol: {}'.format(symbol)) + + if symbol in self._asset_cache: + return self._asset_cache[symbol] + else: + asset = self.exchange.get_asset(symbol) + self._asset_cache[symbol] = asset + return asset diff --git a/catalyst/exchange/bitfinex/__init__.py b/catalyst/exchange/bitfinex/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/catalyst/exchange/bitfinex/bitfinex.py b/catalyst/exchange/bitfinex/bitfinex.py new file mode 100644 index 00000000..fe32991a --- /dev/null +++ b/catalyst/exchange/bitfinex/bitfinex.py @@ -0,0 +1,529 @@ +import base64 +import hashlib +import hmac +import json +import re +import time + +import numpy as np +import pandas as pd +import pytz +import requests +import six +from catalyst.assets._assets import TradingPair +from logbook import Logger + +# from websocket import create_connection +from catalyst.exchange.exchange import Exchange +from catalyst.exchange.exchange_errors import ( + ExchangeRequestError, + InvalidHistoryFrequencyError, + InvalidOrderStyle, OrderCancelError) +from catalyst.exchange.exchange_execution import ExchangeLimitOrder, \ + ExchangeStopLimitOrder, ExchangeStopOrder +from catalyst.finance.order import Order, ORDER_STATUS +from catalyst.protocol import Account + +# Trying to account for REST api instability +# https://stackoverflow.com/questions/15431044/can-i-set-max-retries-for-requests-request +requests.adapters.DEFAULT_RETRIES = 20 + +BITFINEX_URL = 'https://api.bitfinex.com' + +log = Logger('Bitfinex') +warning_logger = Logger('AlgoWarning') + + +class Bitfinex(Exchange): + def __init__(self, key, secret, base_currency, portfolio=None): + self.url = BITFINEX_URL + self.key = key + self.secret = secret.encode('UTF-8') + self.name = 'bitfinex' + self.assets = {} + self.load_assets() + self.base_currency = base_currency + self._portfolio = portfolio + self.minute_writer = None + self.minute_reader = None + + def _request(self, operation, data, version='v1'): + payload_object = { + 'request': '/{}/{}'.format(version, operation), + 'nonce': '{0:f}'.format(time.time() * 1000000), + # convert to string + 'options': {} + } + + if data is None: + payload_dict = payload_object + else: + payload_dict = payload_object.copy() + payload_dict.update(data) + + payload_json = json.dumps(payload_dict) + if six.PY3: + payload = base64.b64encode(bytes(payload_json, 'utf-8')) + else: + payload = base64.b64encode(payload_json) + + m = hmac.new(self.secret, payload, hashlib.sha384) + m = m.hexdigest() + + # headers + headers = { + 'X-BFX-APIKEY': self.key, + 'X-BFX-PAYLOAD': payload, + 'X-BFX-SIGNATURE': m + } + + if data is None: + request = requests.get( + '{url}/{version}/{operation}'.format( + url=self.url, + version=version, + operation=operation + ), data={}, + headers=headers) + else: + request = requests.post( + '{url}/{version}/{operation}'.format( + url=self.url, + version=version, + operation=operation + ), + headers=headers) + + return request + + def _get_v2_symbol(self, asset): + pair = asset.symbol.split('_') + symbol = 't' + pair[0].upper() + pair[1].upper() + return symbol + + def _get_v2_symbols(self, assets): + """ + Workaround to support Bitfinex v2 + TODO: Might require a separate asset dictionary + + :param assets: + :return: + """ + + v2_symbols = [] + for asset in assets: + v2_symbols.append(self._get_v2_symbol(asset)) + + return v2_symbols + + def _create_order(self, order_status): + """ + Create a Catalyst order object from a Bitfinex order dictionary + :param order_status: + :return: Order + """ + if order_status['is_cancelled']: + status = ORDER_STATUS.CANCELLED + elif not order_status['is_live']: + log.info('found executed order {}'.format(order_status)) + status = ORDER_STATUS.FILLED + else: + status = ORDER_STATUS.OPEN + + amount = float(order_status['original_amount']) + filled = float(order_status['executed_amount']) + + if order_status['side'] == 'sell': + amount = -amount + filled = -filled + + price = float(order_status['price']) + order_type = order_status['type'] + + stop_price = None + limit_price = None + + # TODO: is this comprehensive enough? + if order_type.endswith('limit'): + limit_price = price + elif order_type.endswith('stop'): + stop_price = price + + executed_price = float(order_status['avg_execution_price']) + + # TODO: bitfinex does not specify comission. I could calculate it but not sure if it's worth it. + commission = None + + date = pd.Timestamp.utcfromtimestamp(float(order_status['timestamp'])) + date = pytz.utc.localize(date) + order = Order( + dt=date, + asset=self.assets[order_status['symbol']], + amount=amount, + stop=stop_price, + limit=limit_price, + filled=filled, + id=str(order_status['id']), + commission=commission + ) + order.status = status + + return order, executed_price + + def get_balances(self): + log.debug('retrieving wallets balances') + try: + response = self._request('balances', None) + balances = response.json() + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'message' in balances: + raise ExchangeRequestError( + error='unable to fetch balance {}'.format(balances['message']) + ) + + std_balances = dict() + for balance in balances: + currency = balance['currency'].lower() + std_balances[currency] = float(balance['available']) + + return std_balances + + @property + def account(self): + account = Account() + + account.settled_cash = None + account.accrued_interest = None + account.buying_power = None + account.equity_with_loan = None + account.total_positions_value = None + account.total_positions_exposure = None + account.regt_equity = None + account.regt_margin = None + account.initial_margin_requirement = None + account.maintenance_margin_requirement = None + account.available_funds = None + account.excess_liquidity = None + account.cushion = None + account.day_trades_remaining = None + account.leverage = None + account.net_leverage = None + account.net_liquidation = None + + return account + + @property + def time_skew(self): + # TODO: research the time skew conditions + return pd.Timedelta('0s') + + def get_account(self): + # TODO: fetch account data and keep in cache + return None + + def get_candles(self, data_frequency, assets, bar_count=None): + """ + Retrieve OHLVC candles from Bitfinex + + :param data_frequency: + :param assets: + :param bar_count: + :return: + + Available Frequencies + --------------------- + '1m', '5m', '15m', '30m', '1h', '3h', '6h', '12h', '1D', '7D', '14D', + '1M' + """ + + # TODO: use BcolzMinuteBarReader to read from cache + freq_match = re.match(r'([0-9].*)(m|h|d)', data_frequency, re.M | re.I) + if freq_match: + number = int(freq_match.group(1)) + unit = freq_match.group(2) + + if unit == 'd': + converted_unit = 'D' + else: + converted_unit = unit + + frequency = '{}{}'.format(number, converted_unit) + allowed_frequencies = ['1m', '5m', '15m', '30m', '1h', '3h', '6h', + '12h', '1D', '7D', '14D', '1M'] + + if frequency not in allowed_frequencies: + raise InvalidHistoryFrequencyError( + frequency=data_frequency + ) + elif data_frequency == 'minute': + frequency = '1m' + elif data_frequency == 'daily': + frequency = '1D' + else: + raise InvalidHistoryFrequencyError( + frequency=data_frequency + ) + + # Making sure that assets are iterable + asset_list = [assets] if isinstance(assets, TradingPair) else assets + ohlc_map = dict() + for asset in asset_list: + symbol = self._get_v2_symbol(asset) + url = '{url}/v2/candles/trade:{frequency}:{symbol}'.format( + url=self.url, + frequency=frequency, + symbol=symbol + ) + + if bar_count: + is_list = True + url += '/hist?limit={}'.format(int(bar_count)) + else: + is_list = False + url += '/last' + + try: + response = requests.get(url) + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'error' in response.content: + raise ExchangeRequestError( + error='Unable to retrieve candles: {}'.format( + response.content) + ) + + candles = response.json() + + def ohlc_from_candle(candle): + ohlc = dict( + open=np.float64(candle[1]), + high=np.float64(candle[3]), + low=np.float64(candle[4]), + close=np.float64(candle[2]), + volume=np.float64(candle[5]), + price=np.float64(candle[2]), + last_traded=pd.Timestamp.utcfromtimestamp( + candle[0] / 1000.0) + ) + return ohlc + + if is_list: + ohlc_bars = [] + # We can to list candles from old to new + for candle in reversed(candles): + ohlc = ohlc_from_candle(candle) + ohlc_bars.append(ohlc) + + ohlc_map[asset] = ohlc_bars + + else: + ohlc = ohlc_from_candle(candles) + ohlc_map[asset] = ohlc + + return ohlc_map[assets] \ + if isinstance(assets, TradingPair) else ohlc_map + + def create_order(self, asset, amount, is_buy, style): + """ + Creating order on the exchange. + + :param asset: + :param amount: + :param is_buy: + :param style: + :return: + """ + exchange_symbol = self.get_symbol(asset) + if isinstance(style, ExchangeLimitOrder) \ + or isinstance(style, ExchangeStopLimitOrder): + price = style.get_limit_price(is_buy) + order_type = 'limit' + + elif isinstance(style, ExchangeStopOrder): + price = style.get_stop_price(is_buy) + order_type = 'stop' + + else: + raise InvalidOrderStyle(exchange=self.name, + style=style.__class__.__name__) + + req = dict( + symbol=exchange_symbol, + amount=str(float(abs(amount))), + price="{:.20f}".format(float(price)), + side='buy' if is_buy else 'sell', + type='exchange ' + order_type, # TODO: support margin trades + exchange=self.name, + is_hidden=False, + is_postonly=False, + use_all_available=0, + ocoorder=False, + buy_price_oco=0, + sell_price_oco=0 + ) + + date = pd.Timestamp.utcnow() + try: + response = self._request('order/new', req) + order_status = response.json() + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'message' in order_status: + raise ExchangeRequestError( + error='unable to create Bitfinex order {}'.format( + order_status['message']) + ) + + order_id = str(order_status['id']) + order = Order( + dt=date, + asset=asset, + amount=amount, + stop=style.get_stop_price(is_buy), + limit=style.get_limit_price(is_buy), + id=order_id + ) + + return order + + def get_open_orders(self, asset=None): + """Retrieve all of the current open orders. + + Parameters + ---------- + asset : Asset + If passed and not None, return only the open orders for the given + asset instead of all open orders. + + Returns + ------- + open_orders : dict[list[Order]] or list[Order] + If no asset is passed this will return a dict mapping Assets + to a list containing all the open orders for the asset. + If an asset is passed then this will return a list of the open + orders for this asset. + """ + try: + response = self._request('orders', None) + order_statuses = response.json() + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'message' in order_statuses: + raise ExchangeRequestError( + error='Unable to retrieve open orders: {}'.format( + order_statuses['message']) + ) + + orders = list() + for order_status in order_statuses: + order, executed_price = self._create_order(order_status) + if asset is None or asset == order.sid: + orders.append(order) + + return orders + + def get_order(self, order_id): + """Lookup an order based on the order id returned from one of the + order functions. + + Parameters + ---------- + order_id : str + The unique identifier for the order. + + Returns + ------- + order : Order + The order object. + """ + try: + response = self._request( + 'order/status', {'order_id': int(order_id)}) + order_status = response.json() + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'message' in order_status: + raise ExchangeRequestError( + error='Unable to retrieve order status: {}'.format( + order_status['message']) + ) + return self._create_order(order_status) + + def cancel_order(self, order_param): + """Cancel an open order. + + Parameters + ---------- + order_param : str or Order + The order_id or order object to cancel. + """ + order_id = order_param.id \ + if isinstance(order_param, Order) else order_param + + try: + response = self._request('order/cancel', {'order_id': order_id}) + status = response.json() + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'message' in status: + raise OrderCancelError( + order_id=order_id, + exchange=self.name, + error=status['message'] + ) + + def tickers(self, assets): + """ + Fetch ticket data for assets + https://docs.bitfinex.com/v2/reference#rest-public-tickers + + :param assets: + :return: + """ + symbols = self._get_v2_symbols(assets) + log.debug('fetching tickers {}'.format(symbols)) + + try: + response = requests.get( + '{url}/v2/tickers?symbols={symbols}'.format( + url=self.url, + symbols=','.join(symbols), + ) + ) + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'error' in response.content: + raise ExchangeRequestError( + error='Unable to retrieve tickers: {}'.format( + response.content) + ) + + tickers = response.json() + + ticks = dict() + for index, ticker in enumerate(tickers): + if not len(ticker) == 11: + raise ExchangeRequestError( + error='Invalid ticker in response: {}'.format(ticker) + ) + + ticks[assets[index]] = dict( + timestamp=pd.Timestamp.utcnow(), + bid=ticker[1], + ask=ticker[3], + last_price=ticker[7], + low=ticker[10], + high=ticker[9], + volume=ticker[8], + ) + + log.debug('got tickers {}'.format(ticks)) + return ticks diff --git a/catalyst/exchange/bitfinex/symbols.json b/catalyst/exchange/bitfinex/symbols.json new file mode 100644 index 00000000..8ab44191 --- /dev/null +++ b/catalyst/exchange/bitfinex/symbols.json @@ -0,0 +1,114 @@ +{ + "btcusd": { + "symbol": "btc_usd", + "start_date": "2010-01-01" + }, + "bchusd": { + "symbol": "bch_usd", + "start_date": "2010-01-01" + }, + "ltcusd": { + "symbol": "ltc_usd", + "start_date": "2010-01-01" + }, + "ltcbtc": { + "symbol": "ltc_btc", + "start_date": "2010-01-01" + }, + "ethusd": { + "symbol": "eth_usd", + "start_date": "2010-01-01" + }, + "ethbtc": { + "symbol": "eth_btc", + "start_date": "2010-01-01" + }, + "etcbtc": { + "symbol": "etc_btc", + "start_date": "2010-01-01" + }, + "etcusd": { + "symbol": "etc_usd", + "start_date": "2010-01-01" + }, + "rrtusd": { + "symbol": "rrt_usd", + "start_date": "2010-01-01" + }, + "rrtbtc": { + "symbol": "rrt_btc", + "start_date": "2010-01-01" + }, + "zecusd": { + "symbol": "zec_usd", + "start_date": "2010-01-01" + }, + "zecbtc": { + "symbol": "zec_btc", + "start_date": "2010-01-01" + }, + "xmrusd": { + "symbol": "xmr_usd", + "start_date": "2010-01-01" + }, + "xmrbtc": { + "symbol": "xmr_btc", + "start_date": "2010-01-01" + }, + "dshusd": { + "symbol": "dsh_usd", + "start_date": "2010-01-01" + }, + "dshbtc": { + "symbol": "dsh_btc", + "start_date": "2010-01-01" + }, + "bccbtc": { + "symbol": "bcc_btc", + "start_date": "2010-01-01" + }, + "bcubtc": { + "symbol": "bcu_btc", + "start_date": "2010-01-01" + }, + "bccusd": { + "symbol": "bcc_usd", + "start_date": "2010-01-01" + }, + "bcuusd": { + "symbol": "bcu_usd", + "start_date": "2010-01-01" + }, + "xrpusd": { + "symbol": "xrp_usd", + "start_date": "2010-01-01" + }, + "xrpbtc": { + "symbol": "xrp_btc", + "start_date": "2010-01-01" + }, + "iotusd": { + "symbol": "iot_usd", + "start_date": "2010-01-01" + }, + "iotbtc": { + "symbol": "iot_btc", + "start_date": "2010-01-01" + }, + "ioteth": { + "symbol": "iot_eth", + "start_date": "2010-01-01" + }, + "eosusd": { + "symbol": "eos_usd", + "start_date": "2010-01-01" + }, + "eosbtc": { + "symbol": "eos_btc", + "start_date": "2010-01-01" + }, + "eoseth": { + "symbol": "eos_eth", + "start_date": "2010-01-01" + } +} \ No newline at end of file diff --git a/catalyst/exchange/bittrex/__init__.py b/catalyst/exchange/bittrex/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/catalyst/exchange/bittrex/bittrex.py b/catalyst/exchange/bittrex/bittrex.py new file mode 100644 index 00000000..dfd1a496 --- /dev/null +++ b/catalyst/exchange/bittrex/bittrex.py @@ -0,0 +1,307 @@ +import json + +import pandas as pd +from catalyst.assets._assets import TradingPair +from logbook import Logger +from six.moves import urllib + +from catalyst.exchange.bittrex.bittrex_api import Bittrex_api +from catalyst.exchange.exchange import Exchange +from catalyst.exchange.exchange_errors import InvalidHistoryFrequencyError, \ + ExchangeRequestError, InvalidOrderStyle, OrderNotFound, OrderCancelError, \ + CreateOrderError +from catalyst.finance.execution import LimitOrder, StopLimitOrder +from catalyst.finance.order import Order, ORDER_STATUS + +log = Logger('Bittrex') + +URL2 = 'https://bittrex.com/Api/v2.0' + + +class Bittrex(Exchange): + def __init__(self, key, secret, base_currency, portfolio=None): + self.api = Bittrex_api(key=key, secret=secret.encode('UTF-8')) + self.name = 'bittrex' + self.base_currency = base_currency + self._portfolio = portfolio + + self.minute_writer = None + self.minute_reader = None + + self.assets = dict() + self.load_assets() + + @property + def account(self): + pass + + @property + def time_skew(self): + # TODO: research the time skew conditions + return pd.Timedelta('0s') + + def sanitize_curency_symbol(self, exchange_symbol): + """ + Helper method used to build the universal pair. + Include any symbol mapping here if appropriate. + + :param exchange_symbol: + :return universal_symbol: + """ + return exchange_symbol.lower() + + def fetch_symbol_map(self): + """ + Since Bittrex gives us a complete dictionary of symbols, + we can build the symbol map ad-hoc as opposed to maintaining + a static file. We must be careful with mapping any unconventional + symbol name as appropriate. + + :return symbol_map: + """ + symbol_map = dict() + + markets = self.api.getmarkets() + for market in markets: + exchange_symbol = market['MarketName'] + symbol = '{market}_{base}'.format( + market=self.sanitize_curency_symbol(market['MarketCurrency']), + base=self.sanitize_curency_symbol(market['BaseCurrency']) + ) + symbol_map[exchange_symbol] = dict( + symbol=symbol, + start_date=pd.to_datetime(market['Created'], utc=True) + ) + + return symbol_map + + def get_balances(self): + try: + log.debug('retrieving wallet balances') + balances = self.api.getbalances() + except Exception as e: + raise ExchangeRequestError(error=e) + + std_balances = dict() + for balance in balances: + currency = balance['Currency'].lower() + std_balances[currency] = balance['Available'] + return std_balances + + def create_order(self, asset, amount, is_buy, style): + log.info('creating {} order'.format('buy' if is_buy else 'sell')) + exchange_symbol = self.get_symbol(asset) + + if isinstance(style, LimitOrder) or isinstance(style, StopLimitOrder): + if isinstance(style, StopLimitOrder): + log.warn('{} will ignore the stop price'.format(self.name)) + + price = style.get_limit_price(is_buy) + try: + if is_buy: + order_status = self.api.buylimit(exchange_symbol, amount, + price) + else: + order_status = self.api.selllimit(exchange_symbol, + abs(amount), price) + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'uuid' in order_status: + order_id = order_status['uuid'] + order = Order( + dt=pd.Timestamp.utcnow(), + asset=asset, + amount=amount, + stop=style.get_stop_price(is_buy), + limit=style.get_limit_price(is_buy), + id=order_id + ) + return order + else: + raise CreateOrderError(exchange=self.name, error=order_status) + else: + raise InvalidOrderStyle(exchange=self.name, + style=style.__class__.__name__) + + def get_open_orders(self, asset): + symbol = self.get_symbol(asset) + try: + open_orders = self.api.getopenorders(symbol) + except Exception as e: + raise ExchangeRequestError(error=e) + + orders = list() + for order_status in open_orders: + order = self._create_order(order_status) + orders.append(order) + + return orders + + def _create_order(self, order_status): + log.info( + 'creating catalyst order from Bittrex {}'.format(order_status)) + if order_status['CancelInitiated']: + status = ORDER_STATUS.CANCELLED + elif order_status['Closed'] is not None: + status = ORDER_STATUS.FILLED + else: + status = ORDER_STATUS.OPEN + + date = pd.to_datetime(order_status['Opened'], utc=True) + amount = order_status['Quantity'] + filled = amount - order_status['QuantityRemaining'] + order = Order( + dt=date, + asset=self.assets[order_status['Exchange']], + amount=amount, + stop=None, # Not yet supported by Bittrex + limit=order_status['Limit'], + filled=filled, + id=order_status['OrderUuid'], + commission=order_status['CommissionPaid'] + ) + order.status = status + + executed_price = order_status['PricePerUnit'] + + return order, executed_price + + def get_order(self, order_id): + log.info('retrieving order {}'.format(order_id)) + try: + order_status = self.api.getorder(order_id) + except Exception as e: + raise ExchangeRequestError(error=e) + + if order_status is None: + raise OrderNotFound(order_id=order_id, exchange=self.name) + + return self._create_order(order_status) + + def cancel_order(self, order_param): + order_id = order_param.id \ + if isinstance(order_param, Order) else order_param + log.info('cancelling order {}'.format(order_id)) + + try: + status = self.api.cancel(order_id) + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'message' in status: + raise OrderCancelError( + order_id=order_id, + exchange=self.name, + error=status['message'] + ) + + def get_candles(self, data_frequency, assets, bar_count=None): + """ + Supported Intervals + ------------------- + day, oneMin, fiveMin, thirtyMin, hour + + :param data_frequency: + :param assets: + :param bar_count: + :return: + """ + log.info('retrieving candles') + + if data_frequency == 'minute' or data_frequency == '1m': + frequency = 'oneMin' + elif data_frequency == '5m': + frequency = 'fiveMin' + elif data_frequency == '30m': + frequency = 'thirtyMin' + elif data_frequency == '1h': + frequency = 'hour' + elif data_frequency == 'daily' or data_frequency == '1D': + frequency = 'day' + else: + raise InvalidHistoryFrequencyError( + frequency=data_frequency + ) + + # Making sure that assets are iterable + asset_list = [assets] if isinstance(assets, TradingPair) else assets + ohlc_map = dict() + for asset in asset_list: + url = '{url}/pub/market/GetTicks?marketName={symbol}' \ + '&tickInterval={frequency}&_=1499127220008'.format( + url=URL2, + symbol=self.get_symbol(asset), + frequency=frequency + ) + + try: + data = json.loads(urllib.request.urlopen(url).read().decode()) + except Exception as e: + raise ExchangeRequestError(error=e) + + if data['message']: + raise ExchangeRequestError( + error='Unable to fetch candles {}'.format(data['message']) + ) + + candles = data['result'] + + def ohlc_from_candle(candle): + ohlc = dict( + open=candle['O'], + high=candle['H'], + low=candle['L'], + close=candle['C'], + volume=candle['V'], + price=candle['C'], + last_traded=pd.to_datetime(candle['T'], utc=True) + ) + return ohlc + + ordered_candles = list(reversed(candles)) + if bar_count is None: + ohlc_map[asset] = ohlc_from_candle(ordered_candles[0]) + else: + ohlc_bars = [] + for candle in ordered_candles[:bar_count]: + ohlc = ohlc_from_candle(candle) + ohlc_bars.append(ohlc) + + ohlc_map[asset] = ohlc_bars + + return ohlc_map[assets] \ + if isinstance(assets, TradingPair) else ohlc_map + + def tickers(self, assets): + """ + As of v1.1, Bittrex only allows one ticker at the time. + So we have to make multiple calls to fetch multiple assets. + + :param assets: + :return: + """ + log.info('retrieving tickers') + + ticks = dict() + for asset in assets: + symbol = self.get_symbol(asset) + try: + ticker = self.api.getticker(symbol) + except Exception as e: + raise ExchangeRequestError(error=e) + + # TODO: catch invalid ticker + ticks[asset] = dict( + timestamp=pd.Timestamp.utcnow(), + bid=ticker['Bid'], + ask=ticker['Ask'], + last_price=ticker['Last'] + ) + + log.debug('got tickers {}'.format(ticks)) + return ticks + + def get_account(self): + log.info('retrieving account data') + pass diff --git a/catalyst/exchange/bittrex/bittrex_api.py b/catalyst/exchange/bittrex/bittrex_api.py new file mode 100644 index 00000000..cda7581e --- /dev/null +++ b/catalyst/exchange/bittrex/bittrex_api.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python +import json +import time +import hmac +import hashlib + +from six.moves import urllib + +# Workaround for backwards compatibility +# https://stackoverflow.com/questions/3745771/urllib-request-in-python-2-7 +urlopen = urllib.request.urlopen + + +class Bittrex_api(object): + def __init__(self, key, secret): + self.key = key + self.secret = secret + self.public = ['getmarkets', 'getcurrencies', 'getticker', + 'getmarketsummaries', 'getmarketsummary', + 'getorderbook', 'getmarkethistory'] + self.market = ['buylimit', 'buymarket', 'selllimit', 'sellmarket', + 'cancel', 'getopenorders'] + self.account = ['getbalances', 'getbalance', 'getdepositaddress', + 'withdraw', 'getorder', 'getorderhistory', + 'getwithdrawalhistory', 'getdeposithistory'] + + def query(self, method, values={}): + if method in self.public: + url = 'https://bittrex.com/api/v1.1/public/' + elif method in self.market: + url = 'https://bittrex.com/api/v1.1/market/' + elif method in self.account: + url = 'https://bittrex.com/api/v1.1/account/' + else: + return 'Something went wrong, sorry.' + + url += method + '?' + urllib.parse.urlencode(values) + + if method not in self.public: + url += '&apikey=' + self.key + url += '&nonce=' + str(int(time.time())) + signature = hmac.new(self.secret, url, hashlib.sha512).hexdigest() + headers = {'apisign': signature} + else: + headers = {} + + req = urllib.request.Request(url, headers=headers) + response = json.loads(urlopen(req).read()) + + if response["result"]: + return response["result"] + else: + return response["message"] + + def getmarkets(self): + return self.query('getmarkets') + + def getcurrencies(self): + return self.query('getcurrencies') + + def getticker(self, market): + return self.query('getticker', {'market': market}) + + def getmarketsummaries(self): + return self.query('getmarketsummaries') + + def getmarketsummary(self, market): + return self.query('getmarketsummary', {'market': market}) + + def getorderbook(self, market, type, depth=20): + return self.query('getorderbook', + {'market': market, 'type': type, 'depth': depth}) + + def getmarkethistory(self, market, count=20): + return self.query('getmarkethistory', + {'market': market, 'count': count}) + + def buylimit(self, market, quantity, rate): + return self.query('buylimit', {'market': market, 'quantity': quantity, + 'rate': rate}) + + def buymarket(self, market, quantity): + return self.query('buymarket', + {'market': market, 'quantity': quantity}) + + def selllimit(self, market, quantity, rate): + return self.query('selllimit', {'market': market, 'quantity': quantity, + 'rate': rate}) + + def sellmarket(self, market, quantity): + return self.query('sellmarket', + {'market': market, 'quantity': quantity}) + + def cancel(self, uuid): + return self.query('cancel', {'uuid': uuid}) + + def getopenorders(self, market): + return self.query('getopenorders', {'market': market}) + + def getbalances(self): + return self.query('getbalances') + + def getbalance(self, currency): + return self.query('getbalance', {'currency': currency}) + + def getdepositaddress(self, currency): + return self.query('getdepositaddress', {'currency': currency}) + + def withdraw(self, currency, quantity, address): + return self.query('withdraw', + {'currency': currency, 'quantity': quantity, + 'address': address}) + + def getorder(self, uuid): + return self.query('getorder', {'uuid': uuid}) + + def getorderhistory(self, market, count): + return self.query('getorderhistory', + {'market': market, 'count': count}) + + def getwithdrawalhistory(self, currency, count): + return self.query('getwithdrawalhistory', + {'currency': currency, 'count': count}) + + def getdeposithistory(self, currency, count): + return self.query('getdeposithistory', + {'currency': currency, 'count': count}) diff --git a/catalyst/exchange/data_portal_exchange.py b/catalyst/exchange/data_portal_exchange.py new file mode 100644 index 00000000..77a7cb76 --- /dev/null +++ b/catalyst/exchange/data_portal_exchange.py @@ -0,0 +1,121 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from time import sleep + +from logbook import Logger + +from catalyst.data.data_portal import DataPortal +from catalyst.exchange.exchange_errors import ( + ExchangeRequestError, + ExchangeBarDataError +) + +log = Logger('DataPortalExchange') + + +class DataPortalExchange(DataPortal): + def __init__(self, exchange, *args, **kwargs): + self.exchange = exchange + + # TODO: put somewhere accessible by each algo + self.retry_get_history_window = 5 + self.retry_get_spot_value = 5 + self.retry_delay = 5 + + super(DataPortalExchange, self).__init__(*args, **kwargs) + + def _get_history_window(self, + assets, + end_dt, + bar_count, + frequency, + field, + data_frequency, + ffill=True, + attempt_index=0): + try: + return self.exchange.get_history_window( + assets, + end_dt, + bar_count, + frequency, + field, + data_frequency, + ffill) + except ExchangeRequestError as e: + log.warn( + 'get history attempt {}: {}'.format(attempt_index, e) + ) + if attempt_index < self.retry_get_history_window: + sleep(self.retry_delay) + return self._get_history_window(assets, + end_dt, + bar_count, + frequency, + field, + data_frequency, + ffill, + attempt_index + 1) + else: + raise ExchangeBarDataError( + data_type='history', + attempts=attempt_index, + error=e + ) + + def get_history_window(self, + assets, + end_dt, + bar_count, + frequency, + field, + data_frequency, + ffill=True): + return self._get_history_window(assets, + end_dt, + bar_count, + frequency, + field, + data_frequency, + ffill) + + def _get_spot_value(self, assets, field, dt, data_frequency, + attempt_index=0): + try: + return self.exchange.get_spot_value(assets, field, dt, + data_frequency) + except ExchangeRequestError as e: + log.warn( + 'get spot value attempt {}: {}'.format(attempt_index, e) + ) + if attempt_index < self.retry_get_spot_value: + sleep(self.retry_delay) + return self._get_spot_value(assets, field, dt, data_frequency, + attempt_index + 1) + else: + raise ExchangeBarDataError( + data_type='spot', + attempts=attempt_index, + error=e + ) + + def get_spot_value(self, assets, field, dt, data_frequency): + return self._get_spot_value(assets, field, dt, data_frequency) + + def get_adjusted_value(self, asset, field, dt, + perspective_dt, + data_frequency, + spot_value=None): + # TODO: does this pertain to cryptocurrencies? + raise NotImplementedError("get_adjusted_value is not implemented yet!") diff --git a/catalyst/exchange/exchange.py b/catalyst/exchange/exchange.py new file mode 100644 index 00000000..33712813 --- /dev/null +++ b/catalyst/exchange/exchange.py @@ -0,0 +1,619 @@ +import abc +import collections +import random +from abc import ABCMeta, abstractmethod, abstractproperty +from time import sleep + +import numpy as np +import pandas as pd +from catalyst.assets._assets import TradingPair +from logbook import Logger + +from catalyst.data.data_portal import BASE_FIELDS +from catalyst.errors import ( + SymbolNotFound, +) +from catalyst.exchange.exchange_errors import MismatchingBaseCurrencies, \ + InvalidOrderStyle, BaseCurrencyNotFoundError +from catalyst.exchange.exchange_execution import ExchangeStopLimitOrder, \ + ExchangeLimitOrder, ExchangeStopOrder +from catalyst.exchange.exchange_portfolio import ExchangePortfolio +from catalyst.exchange.exchange_utils import get_exchange_symbols +from catalyst.finance.order import ORDER_STATUS +from catalyst.finance.transaction import Transaction + +log = Logger('Exchange') + + +class Exchange: + __metaclass__ = ABCMeta + + def __init__(self): + self.name = None + self.trading_pairs = None + self.assets = {} + self._portfolio = None + self.minute_writer = None + self.minute_reader = None + self.base_currency = None + + @property + def positions(self): + return self.portfolio.positions + + @property + def portfolio(self): + """ + Return the Portfolio + + :return: + """ + if self._portfolio is None: + self._portfolio = ExchangePortfolio( + start_date=pd.Timestamp.utcnow() + ) + self.synchronize_portfolio() + + return self._portfolio + + @abstractproperty + def account(self): + pass + + @abstractproperty + def time_skew(self): + pass + + def get_symbol(self, asset): + """ + Get the exchange specific symbol of the given asset. + + :param asset: Asset + :return: symbol: str + """ + symbol = None + + for key in self.assets: + if not symbol and self.assets[key].symbol == asset.symbol: + symbol = key + + if not symbol: + raise ValueError('Currency %s not supported by exchange %s' % + (asset['symbol'], self.name)) + + return symbol + + def get_symbols(self, assets): + """ + Get a list of symbols corresponding to each given asset. + + :param assets: Asset[] + :return: + """ + symbols = [] + + for asset in assets: + symbols.append(self.get_symbol(asset)) + + return symbols + + def get_asset(self, symbol): + """ + Find an Asset on the current exchange based on its Catalyst symbol + :param symbol: the [target]_[base] currency pair symbol + :return: Asset + """ + asset = None + + for key in self.assets: + if not asset and self.assets[key].symbol.lower() == symbol.lower(): + asset = self.assets[key] + + if not asset: + raise SymbolNotFound(symbol=symbol) + + return asset + + def fetch_symbol_map(self): + return get_exchange_symbols(self.name) + + def load_assets(self): + """ + Populate the 'assets' attribute with a dictionary of Assets. + The key of the resulting dictionary is the exchange specific + currency pair symbol. The universal symbol is contained in the + 'symbol' attribute of each asset. + + + Notes + ----- + The sid of each asset is calculated based on a numeric hash of the + universal symbol. This simple approach avoids maintaining a mapping + of sids. + + This method can be overridden if an exchange offers equivalent data + via its api. + """ + + symbol_map = self.fetch_symbol_map() + for exchange_symbol in symbol_map: + asset = symbol_map[exchange_symbol] + + if 'start_date' in asset: + start_date = pd.to_datetime(asset['start_date'], utc=True) + else: + start_date = None + + if 'end_date' in asset: + end_date = pd.to_datetime(asset['end_date'], utc=True) + else: + end_date = None + + if 'leverage' in asset: + leverage = asset['leverage'] + else: + leverage = 1.0 + + if 'asset_name' in asset: + asset_name = asset['asset_name'] + else: + asset_name = None + + trading_pair = TradingPair( + symbol=asset['symbol'], + exchange=self.name, + start_date=start_date, + end_date=end_date, + leverage=leverage, + asset_name=asset_name + ) + + self.assets[exchange_symbol] = trading_pair + + def check_open_orders(self): + """ + Loop through the list of open orders in the Portfolio object. + For each executed order found, create a transaction and apply to the + Portfolio. + + :return: + transactions: Transaction[] + """ + transactions = list() + if self.portfolio.open_orders: + for order_id in list(self.portfolio.open_orders): + log.debug('found open order: {}'.format(order_id)) + + order, executed_price = self.get_order(order_id) + log.debug('got updated order {} {}'.format( + order, executed_price)) + + if order.status == ORDER_STATUS.FILLED: + transaction = Transaction( + asset=order.asset, + amount=order.amount, + dt=pd.Timestamp.utcnow(), + price=executed_price, + order_id=order.id, + commission=order.commission + ) + transactions.append(transaction) + + self.portfolio.execute_order(order, transaction) + + elif order.status == ORDER_STATUS.CANCELLED: + self.portfolio.remove_order(order) + + else: + delta = pd.Timestamp.utcnow() - order.dt + log.info( + 'order {order_id} still open after {delta}'.format( + order_id=order_id, + delta=delta + ) + ) + return transactions + + def get_spot_value(self, assets, field, dt=None, data_frequency='minute'): + """ + Public API method that returns a scalar value representing the value + of the desired asset's field at either the given dt. + + Parameters + ---------- + assets : Asset, ContinuousFuture, or iterable of same. + The asset or assets whose data is desired. + field : {'open', 'high', 'low', 'close', 'volume', + 'price', 'last_traded'} + The desired field of the asset. + dt : pd.Timestamp + The timestamp for the desired value. + data_frequency : str + The frequency of the data to query; i.e. whether the data is + 'daily' or 'minute' bars + + Returns + ------- + value : float, int, or pd.Timestamp + The spot value of ``field`` for ``asset`` The return type is based + on the ``field`` requested. If the field is one of 'open', 'high', + 'low', 'close', or 'price', the value will be a float. If the + ``field`` is 'volume' the value will be a int. If the ``field`` is + 'last_traded' the value will be a Timestamp. + + Bitfinex timeframes + ------------------- + Available values: '1m', '5m', '15m', '30m', '1h', '3h', '6h', '12h', + '1D', '7D', '14D', '1M' + """ + if field not in BASE_FIELDS: + raise KeyError('Invalid column: ' + str(field)) + + if isinstance(assets, collections.Iterable): + values = list() + for asset in assets: + value = self.get_single_spot_value( + asset, field, data_frequency) + values.append(value) + + return values + else: + return self.get_single_spot_value( + assets, field, data_frequency) + + def get_single_spot_value(self, asset, field, data_frequency): + """ + Similar to 'get_spot_value' but for a single asset + + Note + ---- + We're writing each minute bar to disk using zipline's machinery. + This is especially useful when running multiple algorithms + concurrently. By using local data when possible, we try to reaching + request limits on exchanges. + + :param asset: + :param field: + :param data_frequency: + :return value: The spot value of the given asset / field + """ + log.debug( + 'fetching spot value {field} for symbol {symbol}'.format( + symbol=asset.symbol, + field=field + ) + ) + + if field == 'price': + field = 'close' + + # Don't use a timezone here + dt = pd.Timestamp.utcnow().floor('1 min') + value = None + if self.minute_reader is not None: + try: + # Slight delay to minimize the chances that multiple algos + # might try to hit the cache at the exact same time. + sleep_time = random.uniform(0.5, 0.8) + sleep(sleep_time) + # TODO: This does not always! Why is that? Open an issue with zipline. + # See: https://github.com/zipline-live/zipline/issues/26 + value = self.minute_reader.get_value( + sid=asset.sid, + dt=dt, + field=field + ) + except Exception as e: + log.warn('minute data not found: {}'.format(e)) + + if value is None or np.isnan(value): + ohlc = self.get_candles(data_frequency, asset) + if field not in ohlc: + raise KeyError('Invalid column: %s' % field) + + if self.minute_writer is not None: + df = pd.DataFrame( + [ohlc], + index=pd.DatetimeIndex([dt]), + columns=['open', 'high', 'low', 'close', 'volume'] + ) + + try: + self.minute_writer.write_sid( + sid=asset.sid, + df=df + ) + log.debug('wrote minute data: {}'.format(dt)) + except Exception as e: + log.warn( + 'unable to write minute data: {} {}'.format(dt, e)) + + value = ohlc[field] + log.debug('got spot value: {}'.format(value)) + else: + log.debug('got spot value from cache: {}'.format(value)) + + return value + + def get_history_window(self, + assets, + end_dt, + bar_count, + frequency, + field, + data_frequency, + ffill=True): + + """ + Public API method that returns a dataframe containing the requested + history window. Data is fully adjusted. + + Parameters + ---------- + assets : list of catalyst.data.Asset objects + The assets whose data is desired. + + end_dt: not applicable to cryptocurrencies + + bar_count: int + The number of bars desired. + + frequency: string + "1d" or "1m" + + field: string + The desired field of the asset. + + data_frequency: string + The frequency of the data to query; i.e. whether the data is + 'daily' or 'minute' bars. + + # TODO: fill how? + ffill: boolean + Forward-fill missing values. Only has effect if field + is 'price'. + + Returns + ------- + A dataframe containing the requested data. + """ + + candles = self.get_candles( + data_frequency=frequency, + assets=assets, + bar_count=bar_count, + ) + + series = dict() + for asset in assets: + asset_candles = candles[asset] + + values = map(lambda candle: candle[field], asset_candles) + dates = map(lambda candle: candle['last_traded'], asset_candles) + + value_series = pd.Series(values, index=dates) + series[asset] = value_series + + df = pd.concat(series) + return df + + def synchronize_portfolio(self): + """ + Update the portfolio cash and position balances based on the + latest ticker prices. + + :return: + """ + log.debug('synchronizing portfolio with exchange {}'.format(self.name)) + balances = self.get_balances() + + base_position_available = balances[self.base_currency] \ + if self.base_currency in balances else None + + if base_position_available is None: + raise BaseCurrencyNotFoundError( + base_currency=self.base_currency, + exchange=self.name + ) + + portfolio = self._portfolio + portfolio.cash = base_position_available + log.debug('found base currency balance: {}'.format(portfolio.cash)) + + if portfolio.starting_cash is None: + portfolio.starting_cash = portfolio.cash + + if portfolio.positions: + assets = portfolio.positions.keys() + tickers = self.tickers(assets) + + portfolio.positions_value = 0.0 + for asset in tickers: + # TODO: convert if the position is not in the base currency + ticker = tickers[asset] + position = portfolio.positions[asset] + position.last_sale_price = ticker['last_price'] + position.last_sale_date = ticker['timestamp'] + + portfolio.positions_value += \ + position.amount * position.last_sale_price + portfolio.portfolio_value = \ + portfolio.positions_value + portfolio.cash + + @abstractmethod + def get_balances(self): + """ + Retrieve wallet balances for the exchange + :return balances: A dict of currency => available balance + """ + pass + + @abstractmethod + def create_order(self, asset, amount, is_buy, style): + pass + + def order(self, asset, amount, limit_price=None, stop_price=None, + style=None): + """Place an order. + + Parameters + ---------- + asset : Asset + The asset that this order is for. + amount : int + The amount of shares to order. If ``amount`` is positive, this is + the number of shares to buy or cover. If ``amount`` is negative, + this is the number of shares to sell or short. + limit_price : float, optional + The limit price for the order. + stop_price : float, optional + The stop price for the order. + style : ExecutionStyle, optional + The execution style for the order. + + Returns + ------- + order_id : str or None + The unique identifier for this order, or None if no order was + placed. + + Notes + ----- + The ``limit_price`` and ``stop_price`` arguments provide shorthands for + passing common execution styles. Passing ``limit_price=N`` is + equivalent to ``style=LimitOrder(N)``. Similarly, passing + ``stop_price=M`` is equivalent to ``style=StopOrder(M)``, and passing + ``limit_price=N`` and ``stop_price=M`` is equivalent to + ``style=StopLimitOrder(N, M)``. It is an error to pass both a ``style`` + and ``limit_price`` or ``stop_price``. + + See Also + -------- + :class:`catalyst.finance.execution.ExecutionStyle` + :func:`catalyst.api.order_value` + :func:`catalyst.api.order_percent` + """ + if amount == 0: + log.warn('skipping order amount of 0') + return None + + if asset.base_currency != self.base_currency.lower(): + raise MismatchingBaseCurrencies( + base_currency=asset.base_currency, + algo_currency=self.base_currency + ) + + is_buy = (amount > 0) + + if limit_price is not None and stop_price is not None: + style = ExchangeStopLimitOrder(limit_price, stop_price, + exchange=self.name) + elif limit_price is not None: + style = ExchangeLimitOrder(limit_price, exchange=self.name) + + elif stop_price is not None: + style = ExchangeStopOrder(stop_price, exchange=self.name) + + elif style is not None: + raise InvalidOrderStyle(exchange=self.name, + style=style.__class__.__name__) + else: + raise ValueError('Incomplete order data.') + + display_price = limit_price if limit_price is not None else stop_price + log.debug( + 'issuing {side} order of {amount} {symbol} for {type}: {price}'.format( + side='buy' if is_buy else 'sell', + amount=amount, + symbol=asset.symbol, + type=style.__class__.__name__, + price='{}{}'.format(display_price, asset.base_currency) + ) + ) + order = self.create_order(asset, amount, is_buy, style) + + self._portfolio.create_order(order) + + return order.id + + @abstractmethod + def get_open_orders(self, asset): + """Retrieve all of the current open orders. + + Parameters + ---------- + asset : Asset + If passed and not None, return only the open orders for the given + asset instead of all open orders. + + Returns + ------- + open_orders : dict[list[Order]] or list[Order] + If no asset is passed this will return a dict mapping Assets + to a list containing all the open orders for the asset. + If an asset is passed then this will return a list of the open + orders for this asset. + """ + pass + + @abstractmethod + def get_order(self, order_id): + """Lookup an order based on the order id returned from one of the + order functions. + + Parameters + ---------- + order_id : str + The unique identifier for the order. + + Returns + ------- + order : Order + The order object. + execution_price: float + The execution price per share of the order + """ + pass + + @abstractmethod + def cancel_order(self, order_param): + """Cancel an open order. + + Parameters + ---------- + order_param : str or Order + The order_id or order object to cancel. + """ + pass + + @abstractmethod + def get_candles(self, data_frequency, assets, bar_count=None): + """ + Retrieve OHLCV candles for the given assets + + :param data_frequency: + :param assets: + :param end_dt: + :param bar_count: + :param limit: + :return: + """ + pass + + @abc.abstractmethod + def tickers(self, assets): + """ + Retrieve current tick data for the given assets + + :param assets: + :return: + """ + pass + + @abc.abstractmethod + def get_account(self): + """ + Retrieve the account parameters. + :return: + """ + pass diff --git a/catalyst/exchange/exchange_clock.py b/catalyst/exchange/exchange_clock.py new file mode 100644 index 00000000..4e180816 --- /dev/null +++ b/catalyst/exchange/exchange_clock.py @@ -0,0 +1,60 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from time import sleep + +import pandas as pd +from catalyst.gens.sim_engine import ( + BAR, + SESSION_START, + MINUTE_END, + SESSION_END +) +from logbook import Logger + +log = Logger('ExchangeClock') + + +class ExchangeClock(object): + """Realtime clock for live trading. + + This class is a drop-in replacement for + :class:`zipline.gens.sim_engine.MinuteSimulationClock`. + + This is a stripped down version because crypto exchanges run around the clock. + + The :param:`time_skew` parameter represents the time difference between + the Broker and the live trading machine's clock. + """ + + def __init__(self, sessions, time_skew=pd.Timedelta("0s")): + + self.sessions = sessions + self.time_skew = time_skew + self._last_emit = None + self._before_trading_start_bar_yielded = True + + def __iter__(self): + yield pd.Timestamp.utcnow(), SESSION_START + + while True: + current_time = pd.Timestamp.utcnow() + current_minute = current_time.floor('1 min') + + if self._last_emit is None or current_minute > self._last_emit: + log.debug('emitting minutely bar: {}'.format(current_minute)) + + self._last_emit = current_minute + yield current_minute, BAR + else: + sleep(1) diff --git a/catalyst/exchange/exchange_errors.py b/catalyst/exchange/exchange_errors.py new file mode 100644 index 00000000..0cfa35b3 --- /dev/null +++ b/catalyst/exchange/exchange_errors.py @@ -0,0 +1,113 @@ +from catalyst.errors import ZiplineError + + +class ExchangeRequestError(ZiplineError): + msg = ( + 'Request failed: {error}' + ).strip() + + +class ExchangeRequestErrorTooManyAttempts(ZiplineError): + msg = ( + 'Request failed: {error}, giving up after {attempts} attempts' + ).strip() + + +class ExchangeBarDataError(ZiplineError): + msg = ( + 'Unable to retrieve bar data: {data_type}, ' + + 'giving up after {attempts} attempts: {error}' + ).strip() + + +class ExchangePortfolioDataError(ZiplineError): + msg = ( + 'Unable to retrieve portfolio data: {data_type}, ' + + 'giving up after {attempts} attempts: {error}' + ).strip() + + +class ExchangeTransactionError(ZiplineError): + msg = ( + 'Unable to execute transaction: {transaction_type}, ' + + 'giving up after {attempts} attempts: {error}' + ).strip() + + +class ExchangeAuthNotFound(ZiplineError): + msg = ( + 'Please create an auth.json file containing the api token and key for ' + 'exchange {exchange}. Place the file here: {filename}' + ).strip() + + +class ExchangeSymbolsNotFound(ZiplineError): + msg = ( + 'Unable to download or find a local copy of symbols.json for exchange ' + '{exchange}. The file should be here: {filename}' + ).strip() + + +class AlgoPickleNotFound(ZiplineError): + msg = ( + 'Pickle not found for algo {algo} in path {filename}' + ).strip() + + +class InvalidHistoryFrequencyError(ZiplineError): + msg = ( + 'History frequency {frequency} not supported by the exchange.' + ).strip() + + +class InvalidSymbolError(ZiplineError): + msg = ( + 'Invalid trading pair symbol: {symbol}. ' + 'Catalyst symbols must follow this convention: ' + '[Market Currency]_[Base Currency]. For example: eth_usd, btc_usd, ' + 'neo_eth, ubq_btc. Error details: {error}' + ).strip() + + +class InvalidOrderStyle(ZiplineError): + msg = ( + 'Order style {style} not supported by exchange {exchange}.' + ).strip() + + +class CreateOrderError(ZiplineError): + msg = ( + 'Unable to create order on exchange {exchange} {error}.' + ).strip() + + +class OrderNotFound(ZiplineError): + msg = ( + 'Order {order_id} not found on exchange {exchange}.' + ).strip() + + +class OrderCancelError(ZiplineError): + msg = ( + 'Unable to cancel order {order_id} on exchange {exchange} {error}.' + ).strip() + + +class SidHashError(ZiplineError): + msg = ( + 'Unable to hash sid from symbol {symbol}.' + ).strip() + + +class BaseCurrencyNotFoundError(ZiplineError): + msg = ( + 'Algorithm base currency {base_currency} not found in exchange ' + '{exchange}.' + ).strip() + + +class MismatchingBaseCurrencies(ZiplineError): + msg = ( + 'Unable to trade with base currency {base_currency} when the ' + 'algorithm uses {algo_currency}.' + ).strip() diff --git a/catalyst/exchange/exchange_execution.py b/catalyst/exchange/exchange_execution.py new file mode 100644 index 00000000..6527678e --- /dev/null +++ b/catalyst/exchange/exchange_execution.py @@ -0,0 +1,39 @@ +from catalyst.finance.execution import LimitOrder, StopOrder, StopLimitOrder + + +class ExchangeLimitOrder(LimitOrder): + def get_limit_price(self, is_buy): + """ + We may be trading Satoshis with 8 decimals, we cannot round numbers + :param is_buy: + :return: + """ + return self.limit_price + + +class ExchangeStopOrder(StopOrder): + def get_stop_price(self, is_buy): + """ + We may be trading Satoshis with 8 decimals, we cannot round numbers + :param is_buy: + :return: + """ + return self.stop_price + + +class ExchangeStopLimitOrder(StopLimitOrder): + def get_limit_price(self, is_buy): + """ + We may be trading Satoshis with 8 decimals, we cannot round numbers + :param is_buy: + :return: + """ + return self.limit_price + + def get_stop_price(self, is_buy): + """ + We may be trading Satoshis with 8 decimals, we cannot round numbers + :param is_buy: + :return: + """ + return self.stop_price diff --git a/catalyst/exchange/exchange_portfolio.py b/catalyst/exchange/exchange_portfolio.py new file mode 100644 index 00000000..ded8a2a4 --- /dev/null +++ b/catalyst/exchange/exchange_portfolio.py @@ -0,0 +1,87 @@ +import numpy as np +from logbook import Logger + +from catalyst.protocol import Portfolio, Positions, Position + +log = Logger('ExchangePortfolio') + + +class ExchangePortfolio(Portfolio): + """ + Since the goal is to support multiple exchanges, it makes sense to + include additional stats in the portfolio object. + + Instead of relying on the performance tracker, each exchange portfolio + tracks its own holding. This offers a separation between tracking an + exchange and the statistics of the algorithm. + """ + + def __init__(self, start_date, starting_cash=None): + self.capital_used = 0.0 + self.starting_cash = starting_cash + self.portfolio_value = starting_cash + self.pnl = 0.0 + self.returns = 0.0 + self.cash = starting_cash + self.positions = Positions() + self.start_date = start_date + self.positions_value = 0.0 + self.open_orders = dict() + + def calculate_pnl(self): + log.debug('calculating pnl') + + def create_order(self, order): + log.debug('creating order {}'.format(order.id)) + self.open_orders[order.id] = order + + order_position = self.positions[order.asset] \ + if order.asset in self.positions else None + + if order_position is None: + order_position = Position(order.asset) + self.positions[order.asset] = order_position + + order_position.amount += order.amount + log.debug('open order added to portfolio') + + def execute_order(self, order, transaction): + log.debug('executing order {}'.format(order.id)) + del self.open_orders[order.id] + + order_position = self.positions[order.asset] \ + if order.asset in self.positions else None + + if order_position is None: + raise ValueError( + 'Trying to execute order for a position not held: %s' % order.id + ) + + self.capital_used += order.amount * transaction.price + + if order.amount > 0: + if order_position.cost_basis > 0: + order_position.cost_basis = np.average( + [order_position.cost_basis, transaction.price], + weights=[order_position.amount, order.amount] + ) + else: + order_position.cost_basis = transaction.price + + log.debug('updated portfolio with executed order') + + def remove_order(self, order): + log.info('removing cancelled order {}'.format(order.id)) + del self.open_orders[order.id] + + order_position = self.positions[order.asset] \ + if order.asset in self.positions else None + + if order_position is None: + raise ValueError( + 'Trying to remove order for a position not held: %s' % order.id + ) + + order_position.amount -= order.amount + + log.debug('removed order from portfolio') diff --git a/catalyst/exchange/exchange_utils.py b/catalyst/exchange/exchange_utils.py new file mode 100644 index 00000000..3b559726 --- /dev/null +++ b/catalyst/exchange/exchange_utils.py @@ -0,0 +1,133 @@ +import json +import os +import pickle +import urllib +from datetime import date, datetime + +from catalyst.exchange.exchange_errors import ExchangeAuthNotFound, \ + ExchangeSymbolsNotFound +from catalyst.utils.paths import data_root, ensure_directory + +SYMBOLS_URL = 'https://raw.githubusercontent.com/enigmampc/catalyst/' \ + 'exchange-trading/catalyst/exchange/{exchange}/symbols.json' + + +def get_exchange_folder(exchange_name, environ=None): + if not environ: + environ = os.environ + + root = data_root(environ) + exchange_folder = os.path.join(root, 'exchanges', exchange_name) + ensure_directory(exchange_folder) + + return exchange_folder + + +def download_exchange_symbols(exchange_name, environ=None): + exchange_folder = get_exchange_folder(exchange_name, environ) + filename = os.path.join(exchange_folder, 'symbols.json') + + url = SYMBOLS_URL.format(exchange=exchange_name) + response = urllib.urlretrieve(url=url, filename=filename) + return response + + +def get_exchange_symbols(exchange_name, environ=None): + exchange_folder = get_exchange_folder(exchange_name, environ) + filename = os.path.join(exchange_folder, 'symbols.json') + + if not os.path.isfile(filename): + download_exchange_symbols(exchange_name, environ) + + if os.path.isfile(filename): + with open(filename) as data_file: + data = json.load(data_file) + return data + else: + raise ExchangeSymbolsNotFound( + exchange=exchange_name, + filename=filename + ) + + +def get_exchange_auth(exchange_name, environ=None): + exchange_folder = get_exchange_folder(exchange_name, environ) + filename = os.path.join(exchange_folder, 'auth.json') + + if os.path.isfile(filename): + with open(filename) as data_file: + data = json.load(data_file) + return data + else: + raise ExchangeAuthNotFound( + exchange=exchange_name, + filename=filename + ) + + +def get_algo_folder(algo_name, environ=None): + if not environ: + environ = os.environ + + root = data_root(environ) + algo_folder = os.path.join(root, 'live_algos', algo_name) + ensure_directory(algo_folder) + + return algo_folder + + +def get_algo_object(algo_name, key, environ=None, rel_path=None): + folder = get_algo_folder(algo_name, environ) + + if rel_path is not None: + folder = os.path.join(folder, rel_path) + + filename = os.path.join(folder, key + '.p') + + if os.path.isfile(filename): + try: + with open(filename, 'rb') as handle: + return pickle.load(handle) + except Exception as e: + return None + else: + return None + + +def save_algo_object(algo_name, key, obj, environ=None, rel_path=None): + folder = get_algo_folder(algo_name, environ) + + if rel_path is not None: + folder = os.path.join(folder, rel_path) + ensure_directory(folder) + + filename = os.path.join(folder, key + '.p') + + with open(filename, 'wb') as handle: + pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL) + + +def append_algo_object(algo_name, key, obj, environ=None): + algo_folder = get_algo_folder(algo_name, environ) + filename = os.path.join(algo_folder, key + '.p') + + mode = 'a+b' if os.path.isfile(filename) else 'wb' + with open(filename, mode) as handle: + pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL) + + +def get_exchange_minute_writer_root(exchange_name, environ=None): + exchange_folder = get_exchange_folder(exchange_name, environ) + + minute_data_folder = os.path.join(exchange_folder, 'minute_data') + ensure_directory(minute_data_folder) + + return minute_data_folder + + +def perf_serial(obj): + """JSON serializer for objects not serializable by default json code""" + + if isinstance(obj, (datetime, date)): + return obj.isoformat() + raise TypeError("Type %s not serializable" % type(obj)) diff --git a/catalyst/exchange/stats_utils.py b/catalyst/exchange/stats_utils.py new file mode 100644 index 00000000..eda1b2fd --- /dev/null +++ b/catalyst/exchange/stats_utils.py @@ -0,0 +1,47 @@ +import pandas as pd + + +def get_pretty_stats(stats_df, num_rows=10): + """ + Format and print the last few rows of a statistics DataFrame. + See the pyfolio project for the data structure. + + :param stats_df: + :param num_rows: + :return: + """ + stats_df.set_index('period_close', drop=True, inplace=True) + stats_df.dropna(axis=1, how='all', inplace=True) + + pd.set_option('display.expand_frame_repr', False) + pd.set_option('precision', 3) + pd.set_option('display.width', 1000) + pd.set_option('display.max_colwidth', 1000) + + columns = ['starting_cash', 'ending_cash', 'portfolio_value', + 'pnl', 'long_exposure', 'short_exposure', 'orders', + 'transactions', 'positions'] + + def format_positions(positions): + parts = [] + for position in positions: + msg = '{amount:.2f}{market} cost basis {cost_basis:.4f}{base}'.format( + amount=position['amount'], + market=position['sid'].market_currency, + cost_basis=position['cost_basis'], + base=position['sid'].base_currency + ) + parts.append(msg) + return ', '.join(parts) + + formatters = { + 'orders': lambda orders: len(orders), + 'transactions': lambda transactions: len(transactions), + 'returns': lambda returns: "{0:.4f}".format(returns), + 'positions': format_positions + } + + return stats_df.tail(num_rows).to_string( + columns=columns, + formatters=formatters + ) diff --git a/catalyst/utils/run_algo.py b/catalyst/utils/run_algo.py index 228d05a3..42f4f349 100644 --- a/catalyst/utils/run_algo.py +++ b/catalyst/utils/run_algo.py @@ -3,12 +3,20 @@ import re from runpy import run_path import sys import warnings +from time import sleep +from datetime import timedelta + +import pandas as pd import click + +from catalyst.exchange.bittrex.bittrex import Bittrex + try: from pygments import highlight from pygments.lexers import PythonLexer from pygments.formatters import TerminalFormatter + PYGMENTS = True except: PYGMENTS = False @@ -29,6 +37,21 @@ from catalyst.utils.calendars import get_calendar from catalyst.utils.factory import create_simulation_parameters import catalyst.utils.paths as pth +from catalyst.exchange.algorithm_exchange import ExchangeTradingAlgorithm +from catalyst.exchange.data_portal_exchange import DataPortalExchange +from catalyst.exchange.bitfinex.bitfinex import Bitfinex +from catalyst.exchange.asset_finder_exchange import AssetFinderExchange +from catalyst.exchange.exchange_portfolio import ExchangePortfolio +from catalyst.exchange.exchange_errors import ( + ExchangeRequestError, + ExchangeRequestErrorTooManyAttempts, + BaseCurrencyNotFoundError) +from catalyst.exchange.exchange_utils import get_exchange_auth, \ + get_algo_object +from logbook import Logger + +log = Logger('run_algo') + class _RunAlgoError(click.ClickException, ValueError): """Signal an error that should have a different message if invoked from @@ -68,7 +91,11 @@ def _run(handle_data, output, print_algo, local_namespace, - environ): + environ, + live, + exchange, + algo_namespace, + base_currency): """Run a backtest for the given algorithm. This is shared between the cli and :func:`catalyst.run_algo`. @@ -117,7 +144,110 @@ def _run(handle_data, else: click.echo(algotext) - if bundle is not None: + mode = 'live' if live else 'backtest' + log.info('running algo in {mode} mode'.format(mode=mode)) + + if live and exchange is not None: + exchange_name = exchange + start = pd.Timestamp.utcnow() + end = start + timedelta(minutes=1439) + + portfolio = get_algo_object( + algo_name=algo_namespace, + key='portfolio_{}'.format(exchange_name), + environ=environ + ) + if portfolio is None: + portfolio = ExchangePortfolio( + start_date=pd.Timestamp.utcnow() + ) + + exchange_auth = get_exchange_auth(exchange_name) + if exchange_name == 'bitfinex': + exchange = Bitfinex( + key=exchange_auth['key'], + secret=exchange_auth['secret'], + base_currency=base_currency, + portfolio=portfolio + ) + elif exchange_name == 'bittrex': + exchange = Bittrex( + key=exchange_auth['key'], + secret=exchange_auth['secret'], + base_currency=base_currency, + portfolio=portfolio + ) + else: + raise NotImplementedError( + 'exchange not supported: %s' % exchange_name) + + open_calendar = get_calendar('OPEN') + sim_params = create_simulation_parameters( + start=start, + end=end, + capital_base=capital_base, + data_frequency=data_frequency, + emission_rate=data_frequency, + ) + + if live and exchange is not None: + env = TradingEnvironment( + environ=environ, + exchange_tz='UTC', + asset_db_path=None + ) + env.asset_finder = AssetFinderExchange(exchange) + + data = DataPortalExchange( + exchange=exchange, + asset_finder=env.asset_finder, + trading_calendar=open_calendar, + first_trading_day=pd.to_datetime('today', utc=True) + ) + choose_loader = None + + def fetch_capital_base(attempt_index=0): + """ + Fetch the base currency amount required to bootstrap + the algorithm against the exchange. + + The algorithm cannot continue without this value. + + :param attempt_index: + :return capital_base: the amount of base currency available for + trading + """ + try: + log.debug('retrieving capital base in {} to bootstrap ' + 'exchange {}'.format(base_currency, exchange_name)) + balances = exchange.get_balances() + except ExchangeRequestError as e: + if attempt_index < 20: + sleep(5) + return fetch_capital_base(attempt_index + 1) + else: + raise ExchangeRequestErrorTooManyAttempts( + attempts=attempt_index, + error=e + ) + + if base_currency in balances: + return balances[base_currency] + else: + raise BaseCurrencyNotFoundError( + base_currency=base_currency, + exchange=exchange_name + ) + + sim_params = create_simulation_parameters( + start=start, + end=end, + capital_base=fetch_capital_base(), + emission_rate='minute', + data_frequency='minute' + ) + + elif bundle is not None: bundles = bundle.split(',') def get_trading_env_and_data(bundles): @@ -146,8 +276,6 @@ def _run(handle_data, str(bundle_data.asset_finder.engine.url), ) - open_calendar = get_calendar('OPEN') - env = TradingEnvironment( load=partial(load_crypto_market_data, bundle=b, bundle_data=bundle_data, environ=environ), bm_symbol='USDT_BTC', @@ -179,16 +307,16 @@ def _run(handle_data, if b == 'poloniex': return CryptoPricingLoader( - bundle_data, - data_frequency, - CryptoPricing, - ) + bundle_data, + data_frequency, + CryptoPricing, + ) elif b == 'quandl': return USEquityPricingLoader( - bundle_data, - data_frequency, - USEquityPricing, - ) + bundle_data, + data_frequency, + USEquityPricing, + ) raise ValueError( "No PipelineLoader registered for bundle %s." % b ) @@ -208,17 +336,16 @@ def _run(handle_data, env = TradingEnvironment(environ=environ) choose_loader = None - perf = TradingAlgorithm( + TradingAlgorithmClass = ( + partial(ExchangeTradingAlgorithm, exchange=exchange, + algo_namespace=algo_namespace) + if live and exchange else TradingAlgorithm) + + perf = TradingAlgorithmClass( namespace=namespace, env=env, get_pipeline_loader=choose_loader, - sim_params=create_simulation_parameters( - start=start, - end=end, - capital_base=capital_base, - data_frequency=data_frequency, - emission_rate=data_frequency, - ), + sim_params=sim_params, **{ 'initialize': initialize, 'handle_data': handle_data, @@ -294,10 +421,10 @@ def load_extensions(default, extensions, strict, environ, reload=False): _loaded_extensions.add(ext) -def run_algorithm(start, - end, - initialize, - capital_base, +def run_algorithm(initialize, + capital_base=None, + start=None, + end=None, handle_data=None, before_trading_start=None, analyze=None, @@ -308,7 +435,11 @@ def run_algorithm(start, default_extension=True, extensions=(), strict_extensions=True, - environ=os.environ): + environ=os.environ, + live=False, + exchange_name=None, + base_currency=None, + algo_namespace=None): """Run a trading algorithm. Parameters @@ -362,6 +493,12 @@ def run_algorithm(start, environ : mapping[str -> str], optional The os environment to use. Many extensions use this to get parameters. This defaults to ``os.environ``. + live: execute live trading + exchange_conn: The exchange connection parameters + + Supported Exchanges + ------------------- + bitfinex Returns ------- @@ -374,25 +511,25 @@ def run_algorithm(start, """ load_extensions(default_extension, extensions, strict_extensions, environ) - non_none_data = valfilter(bool, { - 'data': data is not None, - 'bundle': bundle is not None, - }) - if not non_none_data: - # if neither data nor bundle are passed use 'quantopian-quandl' - bundle = 'quantopian-quandl' + if not live: + non_none_data = valfilter(bool, { + 'data': data is not None, + 'bundle': bundle is not None, + }) + if not non_none_data: + # if neither data nor bundle are passed use 'quantopian-quandl' + bundle = 'quantopian-quandl' - elif len(non_none_data) != 1: - raise ValueError( - 'must specify one of `data`, `data_portal`, or `bundle`,' - ' got: %r' % non_none_data, - ) - - elif 'bundle' not in non_none_data and bundle_timestamp is not None: - raise ValueError( - 'cannot specify `bundle_timestamp` without passing `bundle`', - ) + elif len(non_none_data) != 1: + raise ValueError( + 'must specify one of `data`, `data_portal`, or `bundle`,' + ' got: %r' % non_none_data, + ) + elif 'bundle' not in non_none_data and bundle_timestamp is not None: + raise ValueError( + 'cannot specify `bundle_timestamp` without passing `bundle`', + ) return _run( handle_data=handle_data, initialize=initialize, @@ -412,4 +549,8 @@ def run_algorithm(start, print_algo=False, local_namespace=False, environ=environ, + live=live, + exchange=exchange_name, + algo_namespace=algo_namespace, + base_currency=base_currency ) diff --git a/docs/live-trading-blueprint.md b/docs/live-trading-blueprint.md new file mode 100644 index 00000000..d26638f2 --- /dev/null +++ b/docs/live-trading-blueprint.md @@ -0,0 +1,207 @@ +

Live Trading Blueprint

+The purpose of this document is to allow project contributors navigate +through the ongoing live trading implementation. + +

Components

+At a high level, the following components have been implemented to coerce +zipline into live trading. + +

Exchange

+ +*catalyst/exchange* + +Exchange is a new package introducing cryptocurrency +exchanges to zipline. The package contains mostly new implementations +of existing components, adapted to characteristics of exchanges. + +Here are some key characteristics which make cryptocurrency exchanges +exchanges different compared to equity brokers. +* They trade around the clock. +* Currency symbols are inconsistent across exchanges. +* They trade currency pairs (i.e. the base currency is not always be USD). +This is a paradigm shift in context of zipline. Additional +business logic will be required to manage the portfolio data and orders. +* The price of a single asset might vary across exchanges. This means +arbitrage opportunities. Consequently, to extract maximum alpha, the +platform should not only support multiple exchanges, but also multiple +exchanges per algorithm. +* The fee model is usually more complex than that of an equity broker. +It can vary drastically between exchanges. +* There are no splits, mergers, etc. to worry about. +* A complete order book is usually available, the platform should +offer access to it order to help traders reduce slippage. + +

New Components

+These components of the exchange package were added to the zipline +sources. + +

Exchange

+ +*catalyst/exchange/exchange.py* + +Abstract class which acts as an interface for the implementation of +various exchanges. It also contains logic common to all exchanges. + +

Bitfinex

+ +*catalyst/exchange/bitfinex.py* + +The Bitfinex exchange implementation. It extends the Exchange class. + +

DataPortalExchange

+ +*catalyst/exchange/data_portal_exchange.py* + +Extends the zipline DataPortal to route spot data to the exchange. +This is critical because it allows the algoritm to request data in +real-time. + +For example, `data.current(asset, 'price')` retrieves the current price +of the asset, not the price at the time of yielding the bar this +is critical to minimize slippage. + +At the time of writing, it only supports spot data but I believe that +it should be extended to historical data as well. Some exchanges +have better historical data APIs than others. This will need to +be considered during each individual implementation. + +

ExchangeClock

+ +*catalyst/exchange/exchange_clock.py* + +An implementation to the zipline Clock which runs 24/7. It yields a +bar every minute. + +

AssetFinderExchange

+ +*catalyst/exchange/asset_finder_exchange.py* + +An alternate implementation of AssetFinder which locates each asset +against the exchanges instead of bundle databases. + +For example, `symbol('eth_usd')` should return an Ethereum/USD asset +regardless of currency notation of the target exchange. + +To acheive this, I have created a dictionary of currencies for the +Bitfinex exchange. Here is what it looks like. +* Each key represents the exchange specific symbol. +* The symbol attribute represents the abstract symbol common across +all exchanges for the given currency. +* The start_date attribute should correspond to its first trading day +on the exchange. + +```json +{ + "btcusd": { + "symbol": "btc_usd", + "start_date": "2010-01-01" + }, + "ltcusd": { + "symbol": "ltc_usd", + "start_date": "2010-01-01" + }, + "ltcbtc": { + "symbol": "ltc_btc", + "start_date": "2010-01-01" + }, + "ethusd": { + "symbol": "eth_usd", + "start_date": "2010-01-01" + }, + "ethbtc": { + "symbol": "eth_btc", + "start_date": "2010-01-01" + } +} +``` + +

ExchangeTradingAlgorithm

+ +*catalyst/exchange/algorithm_exchange.py* + +Extends the TradingAlgorithm class which orchestrates the api +operations. This class brings together most of the components +described above. + +

Modified Components

+ +The following components have been modified to include conditional +business logic to enable live trading. + +

run_algorithm

+ +*catalyst/utils/run_algo.py* + +The run_algorithm interface is an entry point to execute an +algorithm in zipline. This component was already modified for +the catalyst concurrency bundles. I added conditional logic +which should not interfere with backtesting. + +In a nutshell, the run_algorithm method now contains three additional +parameters: +* live: If True, zipline will attempt to trade live. If False or not +specified, it will run a backtest as normal. +* algo_namespace: An arbitrary namespace for the current algorithm. +It will be used to persist data between runs. +* exchange_conn: A dictionary containing the attributes required +to instantiate an exchange. Here is an example for Bitfinex: + +```python +exchange_conn = dict( + name='bitfinex', + key='', + secret=b'', + base_currency='usd' +) +``` + +The following sample algorithm uses the run_algorithm interface: + +*catalyst/examples/buy_and_hold_live.py* + +

Portfolio Management

+ +Zipline has a Portfolio class containing key metrics used by zipline +for, but not only, these reasons: + +* Placing orders: When placing orders (e.g. order_target_percent), +zipline queries the portfolio to assess the size of current positions, +cash available, etc. +* Measuring performance: The portfolio contains attributes like +cost basis of each asset, p&l, etc. which zipline uses to compute all +of its performance criteria. + +When backtesting, zipline automatically updates the Portfolio object +of its corresponding algorithm. When live trading, these updates should +be the responsibility of the exchange as it holds the truth for: + +* Executed price of each order (including fees and slippage) +* Partial / failed orders +* Cash (i.e. base currency) available +* Cost basis of each position + +If each exchange account had a one-to-one relationship with an +algorithm, portfolio metrics could be retrieved directly from the +exchange without persisting any data to the algorithm. However, +doing this would have at least the following drawbacks: + +* It may not be reasonable to ask users to dedicate an +exchange account to a single algorithm. Exchanges are not easy +to partition. +* If an exchange account contains existing positions, the calculated +cost basis would correspond to all positions, not just those +initiated by the algorithm. +* It would not be possible impose trading limits on algorithms. + +It follows that Portfolio metrics should be calculated using a strategic +combination of the exchange data and algorithm activity. While tracking +the activity of an algorithm works well in backtesting, it is more +challenging during live trading. A live algorithm might run over +several months. It might have to stop and start for many reasons. +This means that the platform should have the ability to persist +algorithm activity in order to be reliable. + +In the interest of time, I will start by persisting algorithm +activity in memory. Data will be lost when the algorithm execution stops. +The intent it to offer a simple basis from which to implement data +persistence strategies in the future. \ No newline at end of file diff --git a/docs/live-trading-wiki.md b/docs/live-trading-wiki.md new file mode 100644 index 00000000..a7dbd9f1 --- /dev/null +++ b/docs/live-trading-wiki.md @@ -0,0 +1,105 @@ +

Live Trading

+This document explains how to get started with live trading. + +

Supported Exchanges

+Catalyst can trade against these exchanges: + +* Bitfinex, id=`bitfinex` +* Bittrex, id=`bittrex` + +

Authentication

+Most exchanges require key/token combination for authentication. By +convention, Catalyst uses an "auth.json" file to hold this data. + +This example illustrates the convention using the Bitfinex exchange. +Here is how to generate key and secret values for bitfinex: +https://docs.bitfinex.com/v1/docs/api-access. Most exchanges follow +a similar process. + +The auth.json file: +```json +{ + "name": "bitfinex", + "key": "my-key", + "secret": "my-secret" +} +``` + +The file goes here: +``` +~/.catalyst/data/exchanges/bitfinex/auth.json +``` + +Note that the 'bitfinex' directory corresponds to the id of the Bitfinex +exchange as defined in the "Supported Exchanges" section above. +Attempting to run an algorithm where the targeted exchange is missing +its "auth.json" file will create the directory structure but result +in an error. + +

Currency Symbols

+Catalyst introduces a universal convention to reference +trading pairs and individual currencies. This +is required to ensure that the `symbol()` api predictably +returns the correct asset regardless of the targeted exchange. + +Exchanges tend to use their own convention to represent currencies +(e.g. XBT and BTC both represent Bitcoin on different exchanges). +Trading pairs are also inconsistent. For example, Bitfinex +puts the market currency before the base currency without a +separator, Bittrex puts the base currency first and uses a dash +seperator. + +Here is the Catalyst convention: + +*[Market Currency]_[Base Currency]* all lowercase. + +Currency symbols (e.g. btc, eth, ltc) follow the Bittrex convention. + +Here are some examples: +```python +# With Bitfinex +bitcoin_usd_asset = symbol('btc_usd') +ethereum_bitcoin_asset = symbol('eth_btc') + +# With Bittrex +ethereum_bitcoin_asset = symbol('eth_btc') +neo_ethereum_asset = symbol('neo_eth) +``` + +Note that the trading pairs are always referenced in the same manner. +However, not all trading pairs are available on all exchanges. An +error will occur if the specified trading pair is not trading +on the exchange. + +

Trading an Algorithm

+There is no special convention to follow when writing an +algorithm for live trading. The same algorithm should work in +backtest and live execution mode without modification. + +What differs are the arguments provided to the catalyst client or +`run_algorithm()` interface. Here is example: + +```python +run_algorithm( + initialize=initialize, + handle_data=handle_data, + analyze=analyze, + exchange_name='bitfinex', + live=True, + algo_namespace='my_algo_trading_xrp', + base_currency='btc' +) +``` + +Here is the breakdown of the new arguments: +* live: Boolean flag which enables live trading. +* exchange_name: The name of the targeted exchange + (supported values: *bitfinex*, *bittrex*). +* algo_namespace: A arbitrary label assigned to your algorithm for + data storage purposes. +* base_currency: The base currency used to calculate the + statistics of your algorithm. Currently, the base currency of all + trading pairs of your algorithm must match this value. + +Here is a complete algorithm for reference: +[Buy Low and Sell High](../catalyst/examples/buy_low_sell_high_live.py) diff --git a/etc/requirements.txt b/etc/requirements.txt index 7ebfef2c..d3cfabd6 100644 --- a/etc/requirements.txt +++ b/etc/requirements.txt @@ -9,7 +9,9 @@ Logbook==0.12.5 # Scientific Libraries pytz==2016.4 -numpy==1.11.1 + +# FF: Upgraded numpy because of errors with version 1.11 +numpy==1.13.1 # for pandas-datareader requests-file==1.4.1 @@ -77,3 +79,4 @@ lru-dict==1.1.4 empyrical==0.2.1 tables==3.3.0 + diff --git a/setup.py b/setup.py index 74a411ed..a3243545 100644 --- a/setup.py +++ b/setup.py @@ -38,6 +38,7 @@ class LazyBuildExtCommandClass(dict): Lazy command class that defers operations requiring Cython and numpy until they've actually been downloaded and installed by setup_requires. """ + def __contains__(self, key): return ( key == 'build_ext' @@ -62,6 +63,7 @@ class LazyBuildExtCommandClass(dict): Custom build_ext command that lazily adds numpy's include_dir to extensions. """ + def build_extensions(self): """ Lazily append numpy's include directory to Extension includes. @@ -75,6 +77,7 @@ class LazyBuildExtCommandClass(dict): ext.include_dirs.append(numpy_incl) super(build_ext, self).build_extensions() + return build_ext @@ -100,7 +103,8 @@ ext_modules = [ window_specialization('label'), Extension('catalyst.lib.rank', ['catalyst/lib/rank.pyx']), Extension('catalyst.data._equities', ['catalyst/data/_equities.pyx']), - Extension('catalyst.data._adjustments', ['catalyst/data/_adjustments.pyx']), + Extension('catalyst.data._adjustments', + ['catalyst/data/_adjustments.pyx']), Extension('catalyst._protocol', ['catalyst/_protocol.pyx']), Extension('catalyst.gens.sim_engine', ['catalyst/gens/sim_engine.pyx']), Extension( @@ -117,7 +121,6 @@ ext_modules = [ ), ] - STR_TO_CMP = { '<': lt, '<=': le, @@ -264,6 +267,7 @@ def setup_requirements(requirements_path, module_names, strict_bounds, ) return module_lines + conda_build = os.path.basename(sys.argv[0]) in ('conda-build', # unix 'conda-build-script.py') # win @@ -295,7 +299,7 @@ setup( ext_modules=ext_modules, include_package_data=True, package_data={root.replace(os.sep, '.'): - ['*.pyi', '*.pyx', '*.pxi', '*.pxd'] + ['*.pyi', '*.pyx', '*.pxi', '*.pxd'] for root, dirnames, filenames in os.walk('catalyst') if '__pycache__' not in root}, license='Apache 2.0', diff --git a/tests/exchange/__init__.py b/tests/exchange/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/exchange/base.py b/tests/exchange/base.py new file mode 100644 index 00000000..73c43017 --- /dev/null +++ b/tests/exchange/base.py @@ -0,0 +1,38 @@ +import unittest +from abc import ABCMeta, abstractmethod + + +class BaseExchangeTestCase(): + __metaclass__ = ABCMeta + + @abstractmethod + def test_order(self): + pass + + @abstractmethod + def test_open_orders(self): + pass + + @abstractmethod + def test_get_order(self): + pass + + @abstractmethod + def test_cancel_order(self): + pass + + @abstractmethod + def test_get_candles(self): + pass + + @abstractmethod + def test_tickers(self): + pass + + @abstractmethod + def test_get_balances(self): + pass + + @abstractmethod + def test_get_account(self): + pass diff --git a/tests/exchange/test_bitfinex.py b/tests/exchange/test_bitfinex.py new file mode 100644 index 00000000..6b9990c3 --- /dev/null +++ b/tests/exchange/test_bitfinex.py @@ -0,0 +1,70 @@ +from catalyst.exchange.bitfinex.bitfinex import Bitfinex +from .base import BaseExchangeTestCase +from logbook import Logger +import pandas as pd +from catalyst.finance.execution import (MarketOrder, + LimitOrder, + StopOrder, + StopLimitOrder) +from catalyst.exchange.exchange_utils import get_exchange_auth + +log = Logger('test_bitfinex') + + +class BitfinexTestCase(BaseExchangeTestCase): + @classmethod + def setup(self): + print ('creating bitfinex object') + auth = get_exchange_auth('bitfinex') + self.exchange = Bitfinex( + key=auth['key'], + secret=auth['secret'], + base_currency='usd' + ) + + def test_order(self): + log.info('creating order') + asset = self.exchange.get_asset('eth_usd') + order_id = self.exchange.order( + asset=asset, + style=LimitOrder(limit_price=200), + limit_price=200, + amount=0.5, + stop_price=None + ) + log.info('order created {}'.format(order_id)) + pass + + def test_open_orders(self): + log.info('retrieving open orders') + orders = self.exchange.get_open_orders() + pass + + def test_get_order(self): + log.info('retrieving order') + pass + + def test_cancel_order(self): + log.info('cancel order') + pass + + def test_get_candles(self): + log.info('retrieving candles') + pass + + def test_tickers(self): + log.info('retrieving tickers') + tickers = self.exchange.tickers([ + self.exchange.get_asset('eth_usd'), + self.exchange.get_asset('btc_usd') + ]) + pass + + def test_get_account(self): + log.info('retrieving account data') + pass + + def test_get_balances(self): + log.info('testing exchange balances') + balances = self.exchange.get_balances() + pass diff --git a/tests/exchange/test_bittrex.py b/tests/exchange/test_bittrex.py new file mode 100644 index 00000000..825af970 --- /dev/null +++ b/tests/exchange/test_bittrex.py @@ -0,0 +1,83 @@ +from catalyst.exchange.bittrex.bittrex import Bittrex +from catalyst.finance.order import Order +from .base import BaseExchangeTestCase +from logbook import Logger +from catalyst.exchange.exchange_utils import get_exchange_auth + +log = Logger('test_bittrex') + + +class BittrexTestCase(BaseExchangeTestCase): + @classmethod + def setup(self): + print ('creating bittrex object') + auth = get_exchange_auth('bittrex') + self.exchange = Bittrex( + key=auth['key'], + secret=auth['secret'], + base_currency='btc' + ) + + def test_order(self): + log.info('creating order') + asset = self.exchange.get_asset('neo_btc') + order_id = self.exchange.order( + asset=asset, + limit_price=0.0005, + amount=1, + ) + log.info('order created {}'.format(order_id)) + assert order_id is not None + pass + + def test_open_orders(self): + log.info('retrieving open orders') + asset = self.exchange.get_asset('neo_btc') + orders = self.exchange.get_open_orders(asset) + pass + + def test_get_order(self): + log.info('retrieving order') + order = self.exchange.get_order( + u'2c584020-9caf-4af5-bde0-332c0bba17e2') + assert isinstance(order, Order) + pass + + def test_cancel_order(self, ): + log.info('cancel order') + self.exchange.cancel_order(u'dc7bcca2-5219-4145-8848-8a593d2a72f9') + pass + + def test_get_candles(self): + log.info('retrieving candles') + ohlcv_neo = self.exchange.get_candles( + data_frequency='5m', + assets=self.exchange.get_asset('neo_btc') + ) + ohlcv_neo_ubq = self.exchange.get_candles( + data_frequency='5m', + assets=[ + self.exchange.get_asset('neo_btc'), + self.exchange.get_asset('ubq_btc') + ], + bar_count=14 + ) + pass + + def test_tickers(self): + log.info('retrieving tickers') + tickers = self.exchange.tickers([ + self.exchange.get_asset('ubq_btc'), + self.exchange.get_asset('neo_btc') + ]) + assert len(tickers) == 2 + pass + + def test_get_balances(self): + log.info('testing wallet balances') + balances = self.exchange.get_balances() + pass + + def test_get_account(self): + log.info('testing account data') + pass diff --git a/tests/exchange/test_clock.py b/tests/exchange/test_clock.py new file mode 100644 index 00000000..94414d27 --- /dev/null +++ b/tests/exchange/test_clock.py @@ -0,0 +1,50 @@ +from unittest import TestCase +from logbook import Logger +from mock import patch, sentinel +from catalyst.exchange.exchange_clock import ExchangeClock +from catalyst.utils.calendars.trading_calendar import days_at_time +from datetime import time +from collections import defaultdict +from catalyst.utils.calendars import get_calendar +import pandas as pd + +log = Logger('ExchangeClockTestCase') + + +class ExchangeClockTestCase(TestCase): + @classmethod + def setUpClass(cls): + cls.open_calendar = get_calendar("OPEN") + + cls.sessions = pd.Timestamp.utcnow() + + def setUp(self): + self.internal_clock = None + self.events = defaultdict(list) + + def advance_clock(self, x): + """Mock function for sleep. Advances the internal clock by 1 min""" + # The internal clock advance time must be 1 minute to match + # MinutesSimulationClock's update frequency + self.internal_clock += pd.Timedelta('1 min') + + def get_clock(self, arg, *args, **kwargs): + """Mock function for pandas.to_datetime which is used to query the + current time in RealtimeClock""" + assert arg == "now" + return self.internal_clock + + def test_clock(self): + with patch('catalyst.exchange.exchange_clock.pd.to_datetime') as to_dt, \ + patch('catalyst.exchange.exchange_clock.sleep') as sleep: + clock = ExchangeClock(sessions=self.sessions) + to_dt.side_effect = self.get_clock + sleep.side_effect = self.advance_clock + start_time = pd.Timestamp.utcnow() + self.internal_clock = start_time + + events = list(clock) + + # Event 0 is SESSION_START which always happens at 00:00. + ts, event_type = events[1] + pass