diff --git a/catalyst/examples/mean_reversion_simple.py b/catalyst/examples/mean_reversion_simple.py index 130343c9..21e0c7cc 100644 --- a/catalyst/examples/mean_reversion_simple.py +++ b/catalyst/examples/mean_reversion_simple.py @@ -256,7 +256,7 @@ if __name__ == '__main__': initialize=initialize, handle_data=handle_data, analyze=analyze, - exchange_name='bitfinex', + exchange_name='bittrex', algo_namespace=NAMESPACE, base_currency='eth', start=pd.to_datetime('2017-10-01', utc=True), @@ -275,5 +275,6 @@ if __name__ == '__main__': live=True, algo_namespace=NAMESPACE, base_currency='eth', - live_graph=False + live_graph=False, + simulate_orders=True ) diff --git a/catalyst/exchange/ccxt/ccxt_exchange.py b/catalyst/exchange/ccxt/ccxt_exchange.py index 36a0bbdb..7f45627f 100644 --- a/catalyst/exchange/ccxt/ccxt_exchange.py +++ b/catalyst/exchange/ccxt/ccxt_exchange.py @@ -3,6 +3,7 @@ from collections import defaultdict import ccxt import pandas as pd +import six from ccxt import ExchangeNotAvailable from six import string_types @@ -19,7 +20,7 @@ from catalyst.exchange.exchange_errors import InvalidHistoryFrequencyError, \ ExchangeSymbolsNotFound, ExchangeRequestError, InvalidOrderStyle, \ ExchangeNotFoundError from catalyst.exchange.exchange_utils import mixin_market_params, \ - from_ms_timestamp + from_ms_timestamp, get_epoch log = Logger('CCXT', level=LOG_LEVEL) @@ -182,33 +183,48 @@ class CCXT(Exchange): def get_candles(self, freq, assets, bar_count=None, start_dt=None, end_dt=None): + is_single = (isinstance(assets, TradingPair)) + if is_single: + assets = [assets] + symbols = self.get_symbols(assets) + timeframe = self.get_timeframe(freq) - delta = start_dt - pd.to_datetime('1970-1-1', utc=True) + delta = start_dt - get_epoch() ms = int(delta.total_seconds()) * 1000 candles = dict() for asset in assets: - ohlcvs = self.api.fetch_ohlcv( - symbol=symbols[0], - timeframe=timeframe, - since=ms, - limit=bar_count, - params={} - ) + try: + ohlcvs = self.api.fetch_ohlcv( + symbol=symbols[0], + timeframe=timeframe, + since=ms, + limit=bar_count, + params={} + ) - candles[asset] = [] - for ohlcv in ohlcvs: - candles[asset].append(dict( - last_traded=pd.to_datetime(ohlcv[0], unit='ms', utc=True), - open=ohlcv[1], - high=ohlcv[2], - low=ohlcv[3], - close=ohlcv[4], - volume=ohlcv[5] - )) + candles[asset] = [] + for ohlcv in ohlcvs: + candles[asset].append(dict( + last_traded=pd.to_datetime( + ohlcv[0], unit='ms', utc=True + ), + open=ohlcv[1], + high=ohlcv[2], + low=ohlcv[3], + close=ohlcv[4], + volume=ohlcv[5] + )) - return candles + except Exception as e: + raise ExchangeRequestError(error=e) + + if is_single: + return six.next(six.itervalues(candles)) + + else: + return candles def _fetch_symbol_map(self, is_local): try: diff --git a/catalyst/exchange/exchange.py b/catalyst/exchange/exchange.py index a8d58069..ea938387 100644 --- a/catalyst/exchange/exchange.py +++ b/catalyst/exchange/exchange.py @@ -324,7 +324,6 @@ class Exchange: list[Transaction] """ - transactions = list() if self.portfolio.open_orders: for order_id in list(self.portfolio.open_orders): log.debug('found open order: {}'.format(order_id)) @@ -344,12 +343,13 @@ class Exchange: order_id=order.id, commission=order.commission ) - transactions.append(transaction) + yield order, transaction - self.portfolio.execute_order(order, transaction) + # self.portfolio.execute_order(order, transaction) elif order.status == ORDER_STATUS.CANCELLED: - self.portfolio.remove_order(order) + # self.portfolio.remove_order(order) + yield order, None else: delta = pd.Timestamp.utcnow() - order.dt @@ -359,7 +359,6 @@ class Exchange: delta=delta ) ) - return transactions def get_spot_value(self, assets, field, dt=None, data_frequency='minute'): """ diff --git a/catalyst/exchange/exchange_algorithm.py b/catalyst/exchange/exchange_algorithm.py index c2102015..7f671c96 100644 --- a/catalyst/exchange/exchange_algorithm.py +++ b/catalyst/exchange/exchange_algorithm.py @@ -32,7 +32,7 @@ from catalyst.exchange.exchange_errors import ( ExchangeRequestError, ExchangePortfolioDataError, ExchangeTransactionError, - OrphanOrderError) + OrphanOrderError, OrderTypeNotSupported) from catalyst.exchange.exchange_execution import ExchangeStopLimitOrder, \ ExchangeLimitOrder, ExchangeStopOrder from catalyst.exchange.exchange_utils import save_algo_object, get_algo_object, \ @@ -63,9 +63,72 @@ class ExchangeAlgorithmExecutor(AlgorithmSimulator): class ExchangeTradingAlgorithmBase(TradingAlgorithm): def __init__(self, *args, **kwargs): self.exchanges = kwargs.pop('exchanges', None) + self.simulate_orders = kwargs.pop('simulate_orders', None) super(ExchangeTradingAlgorithmBase, self).__init__(*args, **kwargs) + if self.simulate_orders is None \ + and self.sim_params.arena == 'backtest': + self.simulate_orders = True + + self.blotter = ExchangeBlotter( + data_frequency=self.data_frequency, + # Default to NeverCancel in catalyst + cancel_policy=self.cancel_policy, + simulate_orders=self.simulate_orders + ) + + @staticmethod + def __convert_order_params_for_blotter(limit_price, stop_price, style): + """ + Helper method for converting deprecated limit_price and stop_price + arguments into ExecutionStyle instances. + + This function assumes that either style == None or (limit_price, + stop_price) == (None, None). + """ + if stop_price: + raise OrderTypeNotSupported(order_type='stop') + + if style: + if limit_price is not None: + raise ValueError( + 'An order style and a limit price was included in the ' + 'order. Please pick one to avoid any possible conflict.' + ) + + # Currently limiting order types or limit and market to + # be in-line with CXXT and many exchanges. We'll consider + # adding more order types in the future. + if not isinstance(style, ExchangeLimitOrder) or \ + not isinstance(style, MarketOrder): + raise OrderTypeNotSupported( + order_type=style.__class__.__name__ + ) + + return style + + if limit_price: + return ExchangeLimitOrder(limit_price) + else: + return MarketOrder() + + def _calculate_order(self, asset, amount, + limit_price=None, stop_price=None, style=None): + # Raises a ZiplineError if invalid parameters are detected. + self.validate_order_params(asset, + amount, + limit_price, + stop_price, + style) + + # Convert deprecated limit_price and stop_price parameters to use + # ExecutionStyle objects. + style = self.__convert_order_params_for_blotter(limit_price, + stop_price, + style) + return amount, style + def round_order(self, amount, asset): """ We need fractions with cryptocurrencies @@ -204,50 +267,8 @@ class ExchangeTradingAlgorithmBacktest(ExchangeTradingAlgorithmBase): super(ExchangeTradingAlgorithmBacktest, self).__init__(*args, **kwargs) self.frame_stats = list() - self.blotter = ExchangeBlotter( - data_frequency=self.data_frequency, - # Default to NeverCancel in catalyst - cancel_policy=self.cancel_policy, - ) log.info('initialized trading algorithm in backtest mode') - def _calculate_order(self, asset, amount, - limit_price=None, stop_price=None, style=None): - # Raises a ZiplineError if invalid parameters are detected. - self.validate_order_params(asset, - amount, - limit_price, - stop_price, - style) - - # Convert deprecated limit_price and stop_price parameters to use - # ExecutionStyle objects. - style = self.__convert_order_params_for_blotter(limit_price, - stop_price, - style) - return amount, style - - @staticmethod - def __convert_order_params_for_blotter(limit_price, stop_price, style): - """ - Helper method for converting deprecated limit_price and stop_price - arguments into ExecutionStyle instances. - - This function assumes that either style == None or (limit_price, - stop_price) == (None, None). - """ - if style: - assert (limit_price, stop_price) == (None, None) - return style - if limit_price and stop_price: - return ExchangeStopLimitOrder(limit_price, stop_price) - if limit_price: - return ExchangeLimitOrder(limit_price) - if stop_price: - return ExchangeStopOrder(stop_price) - else: - return MarketOrder() - def is_last_frame_of_day(self, data): # TODO: adjust here to support more intervals next_frame_dt = data.current_dt + timedelta(minutes=1) @@ -289,7 +310,6 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): def __init__(self, *args, **kwargs): self.algo_namespace = kwargs.pop('algo_namespace', None) self.live_graph = kwargs.pop('live_graph', None) - self.simulate_orders = kwargs.pop('simulate_orders', None) self._clock = None self.frame_stats = deque(maxlen=60) @@ -416,16 +436,6 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): return self.trading_client.transform() def updated_portfolio(self): - """ - We skip the entire performance tracker business and update the - portfolio directly. - - Returns - ------- - ExchangePortfolio - - """ - # TODO: build cumulative portfolio return self.perf_tracker.get_portfolio(False) def updated_account(self): @@ -450,6 +460,7 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): last_sale_date=position.last_sale_date, last_sale_price=position.last_sale_price ) + except ExchangeRequestError as e: log.warn( 'update portfolio attempt {}: {}'.format(attempt_index, e) @@ -464,30 +475,6 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): error=e ) - def _check_open_orders(self, attempt_index=0): - try: - orders = list() - for exchange_name in self.exchanges: - exchange = self.exchanges[exchange_name] - exchange_orders = exchange.check_open_orders() - - orders += exchange_orders - - return orders - except ExchangeRequestError as e: - log.warn( - 'check open orders attempt {}: {}'.format(attempt_index, e) - ) - if attempt_index < self.retry_check_open_orders: - sleep(self.retry_delay) - return self._check_open_orders(attempt_index + 1) - else: - raise ExchangePortfolioDataError( - data_type='order-status', - attempts=attempt_index, - error=e - ) - def add_pnl_stats(self, period_stats): """ Save p&l stats. @@ -577,14 +564,19 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): if not self.is_running: return - self._synchronize_portfolio() + new_transactions, new_commissions, closed_orders = \ + self.blotter.get_transactions(data) - transactions = self._check_open_orders() - if len(transactions) > 0: - for transaction in transactions: - self.perf_tracker.process_transaction(transaction) + self.blotter.prune_orders(closed_orders) - self.perf_tracker.update_performance() + for transaction in new_transactions: + self.perf_tracker.process_transaction(transaction) + + # since this order was modified, record it + order = self.blotter.orders[transaction.order_id] + self.perf_tracker.process_order(order) + + self.perf_tracker.update_performance() if self._handle_data: self._handle_data(self, data) @@ -713,25 +705,35 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): order: Order The catalyst order object or None """ - amount, style = self._calculate_order(asset, amount, - limit_price, stop_price, - style) - order_id = self._order(asset, amount, limit_price, stop_price, style) + if self.simulate_orders: + order_id = super(ExchangeTradingAlgorithmLive, self).order( + asset, amount, limit_price, stop_price, style + ) + log.debug('created a simulated order {}'.format(order_id)) + + else: + amount, style = self._calculate_order( + asset, amount, limit_price, stop_price, style + ) + order_id = self._order( + asset, amount, limit_price, stop_price, style + ) - exchange = self.exchanges[asset.exchange] - exchange_portfolio = exchange.portfolio if order_id is not None: + current_order = None + for order in self.blotter.open_orders[asset]: + if current_order is None and order.id == order_id: + self.perf_tracker.process_order(order) + current_order = order - if order_id in exchange_portfolio.open_orders: - order = exchange_portfolio.open_orders[order_id] - self.perf_tracker.process_order(order) - return order + if current_order is not None: + return current_order else: raise OrphanOrderError( order_id=order_id, - exchange=exchange.name + exchange=asset.exchange ) else: log.warn('unable to order {} {} on exchange {}'.format( diff --git a/catalyst/exchange/exchange_blotter.py b/catalyst/exchange/exchange_blotter.py index 365682c7..72e6d539 100644 --- a/catalyst/exchange/exchange_blotter.py +++ b/catalyst/exchange/exchange_blotter.py @@ -1,7 +1,11 @@ +from time import sleep + from catalyst.assets._assets import TradingPair from logbook import Logger from catalyst.constants import LOG_LEVEL +from catalyst.exchange.exchange_errors import ExchangeRequestError, \ + ExchangePortfolioDataError from catalyst.finance.blotter import Blotter from catalyst.finance.commission import CommissionModel from catalyst.finance.slippage import SlippageModel @@ -121,6 +125,8 @@ class TradingPairFixedSlippage(SlippageModel): class ExchangeBlotter(Blotter): def __init__(self, *args, **kwargs): + self.simulate_orders = kwargs.pop('simulate_orders', False) + super(ExchangeBlotter, self).__init__(*args, **kwargs) # Using the equity models for now @@ -132,3 +138,43 @@ class ExchangeBlotter(Blotter): self.commission_models = { TradingPair: TradingPairFeeSchedule() } + + def get_exchange_transactions(self, attempt_index=0): + closed_orders = [] + transactions = [] + commissions = [] + + try: + for exchange_name in self.exchanges: + exchange = self.exchanges[exchange_name] + for order, txn in exchange.check_open_orders(): + + order.dt = txn.dt + + transactions.append(txn) + + if not order.open: + closed_orders.append(order) + + return transactions, commissions, closed_orders + + except ExchangeRequestError as e: + log.warn( + 'check open orders attempt {}: {}'.format(attempt_index, e) + ) + if attempt_index < self.retry_check_open_orders: + sleep(self.retry_delay) + return self.get_exchange_transactions(attempt_index + 1) + else: + raise ExchangePortfolioDataError( + data_type='order-status', + attempts=attempt_index, + error=e + ) + + def get_transactions(self, bar_data): + if self.simulate_orders: + return super(ExchangeBlotter, self).get_transactions(bar_data) + + else: + return self.get_exchange_transactions() diff --git a/catalyst/exchange/exchange_errors.py b/catalyst/exchange/exchange_errors.py index 5530ccb2..7bf66f6c 100644 --- a/catalyst/exchange/exchange_errors.py +++ b/catalyst/exchange/exchange_errors.py @@ -217,6 +217,7 @@ class PricingDataNotLoadedError(ZiplineError): '{data_frequency} -i {symbol_list}`. See catalyst documentation ' 'for details.').strip() + class PricingDataValueError(ZiplineError): msg = ('Unable to retrieve pricing data for {exchange} {symbol} ' '[{start_dt} - {end_dt}]: {error}').strip() @@ -244,3 +245,9 @@ class NoDataAvailableOnExchange(ZiplineError): class NoValueForField(ZiplineError): msg = ('Value not found for field: {field}.').strip() + + +class OrderTypeNotSupported(ZiplineError): + msg = ( + 'Order type `{order_type}` not currencly supported by Catalyst. ' + 'Please use `limit` or `market` orders only.').strip() diff --git a/catalyst/exchange/exchange_portfolio.py b/catalyst/exchange/exchange_portfolio.py index b9f45fbb..1df8f9b2 100644 --- a/catalyst/exchange/exchange_portfolio.py +++ b/catalyst/exchange/exchange_portfolio.py @@ -40,7 +40,13 @@ class ExchangePortfolio(Portfolio): """ log.debug('creating order {}'.format(order.id)) - self.open_orders[order.id] = order + + open_orders = self.open_orders[order.asset] \ + if order.asset is self.open_orders else [] + + open_orders.append(order) + + self.open_orders[order.asset] = open_orders order_position = self.positions[order.asset] \ if order.asset in self.positions else None @@ -52,6 +58,17 @@ class ExchangePortfolio(Portfolio): order_position.amount += order.amount log.debug('open order added to portfolio') + def _remove_open_order(self, order): + try: + open_orders = self.open_orders[order.asset] + if order in open_orders: + open_orders.remove(order) + + except Exception: + raise ValueError( + 'unable to clear order not found in open order list.' + ) + def execute_order(self, order, transaction): """ Update the open orders and positions to apply an executed order. @@ -66,7 +83,7 @@ class ExchangePortfolio(Portfolio): """ log.debug('executing order {}'.format(order.id)) - del self.open_orders[order.id] + self._remove_open_order(order) order_position = self.positions[order.asset] \ if order.asset in self.positions else None @@ -99,7 +116,7 @@ class ExchangePortfolio(Portfolio): """ log.info('removing cancelled order {}'.format(order.id)) - del self.open_orders[order.id] + self._remove_open_order(order) order_position = self.positions[order.asset] \ if order.asset in self.positions else None diff --git a/catalyst/exchange/exchange_utils.py b/catalyst/exchange/exchange_utils.py index c2250187..0ae93ea8 100644 --- a/catalyst/exchange/exchange_utils.py +++ b/catalyst/exchange/exchange_utils.py @@ -627,3 +627,7 @@ def mixin_market_params(exchange_name, params, market): def from_ms_timestamp(ms): return pd.to_datetime(ms, unit='ms', utc=True) + + +def get_epoch(): + return pd.to_datetime('1970-1-1', utc=True) diff --git a/etc/requirements.txt b/etc/requirements.txt index dee52a34..be375309 100644 --- a/etc/requirements.txt +++ b/etc/requirements.txt @@ -81,5 +81,5 @@ empyrical==0.2.1 tables==3.3.0 #Catalyst dependencies -ccxt==1.10.251 +ccxt==1.10.283