From e42276affa7ba9ced99ce00d14f20965482fad86 Mon Sep 17 00:00:00 2001 From: fredfortier Date: Wed, 6 Dec 2017 18:02:02 -0500 Subject: [PATCH] BLD: making some adjustment to the blotter to improve paper trading --- catalyst/examples/mean_reversion_simple.py | 2 +- catalyst/exchange/ccxt/ccxt_exchange.py | 18 +-- catalyst/exchange/exchange.py | 135 +++-------------- catalyst/exchange/exchange_algorithm.py | 159 +++++---------------- catalyst/exchange/exchange_blotter.py | 113 +++++++++++++-- catalyst/exchange/exchange_data_portal.py | 11 +- catalyst/exchange/exchange_utils.py | 11 ++ catalyst/exchange/factory.py | 4 +- catalyst/utils/run_algo.py | 13 -- 9 files changed, 179 insertions(+), 287 deletions(-) diff --git a/catalyst/examples/mean_reversion_simple.py b/catalyst/examples/mean_reversion_simple.py index 21e0c7cc..3d85e3f5 100644 --- a/catalyst/examples/mean_reversion_simple.py +++ b/catalyst/examples/mean_reversion_simple.py @@ -276,5 +276,5 @@ if __name__ == '__main__': algo_namespace=NAMESPACE, base_currency='eth', live_graph=False, - simulate_orders=True + simulate_orders=False ) diff --git a/catalyst/exchange/ccxt/ccxt_exchange.py b/catalyst/exchange/ccxt/ccxt_exchange.py index 7f45627f..09879796 100644 --- a/catalyst/exchange/ccxt/ccxt_exchange.py +++ b/catalyst/exchange/ccxt/ccxt_exchange.py @@ -35,8 +35,7 @@ SUPPORTED_EXCHANGES = dict( class CCXT(Exchange): - def __init__(self, exchange_name, key, secret, base_currency, - portfolio=None): + def __init__(self, exchange_name, key, secret, base_currency): log.debug( 'finding {} in CCXT exchanges:\n{}'.format( exchange_name, ccxt.exchanges @@ -69,7 +68,6 @@ class CCXT(Exchange): self.load_assets() self.base_currency = base_currency - self._portfolio = portfolio self.transactions = defaultdict(list) self.num_candles_limit = 2000 @@ -508,18 +506,7 @@ class CCXT(Exchange): return orders - def _get_asset_from_order(self, order_id): - open_orders = self.portfolio.open_orders - order = next( - (open_orders[id] for id in open_orders if id == order_id), - None - ) # type: Order - return order.asset if order is not None else None - def get_order(self, order_id, asset_or_symbol=None): - if asset_or_symbol is None and self.portfolio is not None: - asset_or_symbol = self._get_asset_from_order(order_id) - if asset_or_symbol is None: log.debug( 'order not found in memory, the request might fail ' @@ -540,9 +527,6 @@ class CCXT(Exchange): order_id = order_param.id \ if isinstance(order_param, Order) else order_param - if asset_or_symbol is None and self.portfolio is not None: - asset_or_symbol = self._get_asset_from_order(order_id) - if asset_or_symbol is None: log.debug( 'order not found in memory, cancelling order might fail ' diff --git a/catalyst/exchange/exchange.py b/catalyst/exchange/exchange.py index ea938387..072f4701 100644 --- a/catalyst/exchange/exchange.py +++ b/catalyst/exchange/exchange.py @@ -20,7 +20,6 @@ from catalyst.exchange.exchange_errors import MismatchingBaseCurrencies, \ NoDataAvailableOnExchange, NoValueForField from catalyst.exchange.exchange_execution import ExchangeStopLimitOrder, \ ExchangeLimitOrder, ExchangeStopOrder -from catalyst.exchange.exchange_portfolio import ExchangePortfolio from catalyst.exchange.exchange_utils import get_exchange_symbols, \ get_frequency, resample_history_df from catalyst.finance.order import ORDER_STATUS @@ -36,7 +35,6 @@ class Exchange: self.name = None self.assets = [] self._symbol_maps = [None, None] - self._portfolio = None self.minute_writer = None self.minute_reader = None self.base_currency = None @@ -46,27 +44,6 @@ class Exchange: self.request_cpt = None self.bundle = ExchangeBundle(self.name) - @property - def positions(self): - return self.portfolio.positions - - @property - def portfolio(self): - """ - The exchange portfolio - - Returns - ------- - ExchangePortfolio - """ - if self._portfolio is None: - self._portfolio = ExchangePortfolio( - start_date=pd.Timestamp.utcnow() - ) - self.synchronize_portfolio() - - return self._portfolio - @abstractproperty def account(self): pass @@ -313,53 +290,6 @@ class Exchange: """ pass - def check_open_orders(self): - """ - Loop through the list of open orders in the Portfolio object. - For each executed order found, create a transaction and apply to the - Portfolio. - - Returns - ------- - list[Transaction] - - """ - if self.portfolio.open_orders: - for order_id in list(self.portfolio.open_orders): - log.debug('found open order: {}'.format(order_id)) - - order, executed_price = self.get_order(order_id) - log.debug( - 'got updated order {} {}'.format( - order, executed_price - ) - ) - if order.status == ORDER_STATUS.FILLED: - transaction = Transaction( - asset=order.asset, - amount=order.amount, - dt=pd.Timestamp.utcnow(), - price=executed_price, - order_id=order.id, - commission=order.commission - ) - yield order, transaction - - # self.portfolio.execute_order(order, transaction) - - elif order.status == ORDER_STATUS.CANCELLED: - # self.portfolio.remove_order(order) - yield order, None - - else: - delta = pd.Timestamp.utcnow() - order.dt - log.info( - 'order {order_id} still open after {delta}'.format( - order_id=order_id, - delta=delta - ) - ) - def get_spot_value(self, assets, field, dt=None, data_frequency='minute'): """ Public API method that returns a scalar value representing the value @@ -668,7 +598,7 @@ class Exchange: return df - def synchronize_portfolio(self): + def calculate_totals(self, positions=None): """ Update the portfolio cash and position balances based on the latest ticker prices. @@ -677,42 +607,36 @@ class Exchange: log.debug('synchronizing portfolio with exchange {}'.format(self.name)) balances = self.get_balances() - base_position_available = balances[self.base_currency]['free'] \ + cash = balances[self.base_currency]['free'] \ if self.base_currency in balances else None - if base_position_available is None: + if cash is None: raise BaseCurrencyNotFoundError( base_currency=self.base_currency, - exchange=self.name.title() + exchange=self.name ) + log.debug('found base currency balance: {}'.format(cash)) - portfolio = self._portfolio - portfolio.cash = base_position_available - log.debug('found base currency balance: {}'.format(portfolio.cash)) - - if portfolio.starting_cash is None: - portfolio.starting_cash = portfolio.cash - - if portfolio.positions: - assets = list(portfolio.positions.keys()) + positions_value = 0.0 + if positions is not None: + assets = set([position.asset for position in positions]) tickers = self.tickers(assets) + log.debug('got tickers for positions: {}'.format(tickers)) - portfolio.positions_value = 0.0 for asset in tickers: - # TODO: convert if the position is not in the base currency ticker = tickers[asset] - position = portfolio.positions[asset] + positions = [p for p in positions if p.asset == asset] - position.last_sale_price = ticker['last_price'] - position.last_sale_date = ticker['last_traded'] + for position in positions: + position.last_sale_price = ticker['last_price'] + position.last_sale_date = ticker['last_traded'] - portfolio.positions_value += \ - position.amount * position.last_sale_price - portfolio.portfolio_value = \ - portfolio.positions_value + portfolio.cash + positions_value += \ + position.amount * position.last_sale_price - def order(self, asset, amount, limit_price=None, stop_price=None, - style=None): + return cash, positions_value + + def order(self, asset, amount, style): """Place an order. Parameters @@ -771,22 +695,8 @@ class Exchange: ) is_buy = (amount > 0) + display_price = style.get_limit_price(is_buy) - if limit_price is not None and stop_price is not None: - style = ExchangeStopLimitOrder( - limit_price, stop_price, exchange=self.name - ) - - elif limit_price is not None: - style = ExchangeLimitOrder(limit_price, exchange=self.name) - - elif stop_price is not None: - style = ExchangeStopOrder(stop_price, exchange=self.name) - - else: - style = MarketOrder(exchange=self.name) - - display_price = limit_price if limit_price is not None else stop_price log.debug( 'issuing {side} order of {amount} {symbol} for {type}: {price}'.format( side='buy' if is_buy else 'sell', @@ -797,12 +707,7 @@ class Exchange: ) ) - order = self.create_order(asset, amount, is_buy, style) - if order: - self._portfolio.create_order(order) - return order.id - else: - return None + return self.create_order(asset, amount, is_buy, style) # The methods below must be implemented for each exchange. @abstractmethod diff --git a/catalyst/exchange/exchange_algorithm.py b/catalyst/exchange/exchange_algorithm.py index 7f671c96..8cc84224 100644 --- a/catalyst/exchange/exchange_algorithm.py +++ b/catalyst/exchange/exchange_algorithm.py @@ -21,23 +21,19 @@ from time import sleep import logbook import pandas as pd -from catalyst.assets._assets import TradingPair import catalyst.protocol as zp from catalyst.algorithm import TradingAlgorithm from catalyst.constants import LOG_LEVEL -from catalyst.errors import OrderInBeforeTradingStart from catalyst.exchange.exchange_blotter import ExchangeBlotter from catalyst.exchange.exchange_errors import ( ExchangeRequestError, ExchangePortfolioDataError, - ExchangeTransactionError, - OrphanOrderError, OrderTypeNotSupported) -from catalyst.exchange.exchange_execution import ExchangeStopLimitOrder, \ - ExchangeLimitOrder, ExchangeStopOrder + OrderTypeNotSupported) +from catalyst.exchange.exchange_execution import ExchangeLimitOrder from catalyst.exchange.exchange_utils import save_algo_object, get_algo_object, \ get_algo_folder, get_algo_df, \ - save_algo_df + save_algo_df, group_assets_by_exchange from catalyst.exchange.live_graph_clock import LiveGraphClock from catalyst.exchange.simple_clock import SimpleClock from catalyst.exchange.stats_utils import get_pretty_stats @@ -45,10 +41,8 @@ from catalyst.finance.execution import MarketOrder from catalyst.finance.performance.period import calc_period_stats from catalyst.gens.tradesimulation import AlgorithmSimulator from catalyst.utils.api_support import ( - api_method, - disallowed_in_before_trading_start) -from catalyst.utils.input_validation import error_keywords, ensure_upper_case, \ - expect_types + api_method) +from catalyst.utils.input_validation import error_keywords, ensure_upper_case from catalyst.utils.math_utils import round_nearest from catalyst.utils.preprocess import preprocess @@ -75,7 +69,8 @@ class ExchangeTradingAlgorithmBase(TradingAlgorithm): data_frequency=self.data_frequency, # Default to NeverCancel in catalyst cancel_policy=self.cancel_policy, - simulate_orders=self.simulate_orders + simulate_orders=self.simulate_orders, + exchanges=self.exchanges ) @staticmethod @@ -441,22 +436,25 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): def updated_account(self): return self.perf_tracker.get_account(False) - def _synchronize_portfolio(self, attempt_index=0): + def update_positions(self, attempt_index=0): + tracker = self.perf_tracker.position_tracker + try: - for exchange_name in self.exchanges: - exchange = self.exchanges[exchange_name] + # Position keys correspond to assets + assets = list(tracker.positions) + exchange_assets = group_assets_by_exchange(assets) + for exchange_name in exchange_assets: + assets = exchange_assets[exchange_name] + exchange_positions = \ + [tracker.positions[asset] for asset in assets] - exchange.synchronize_portfolio() + exchange = self.exchanges[exchange_name] # Type: Exchange + cash, positions_value = \ + exchange.calculate_totals(exchange_positions) - # Applying the updated last_sales_price to the positions - # in the performance tracker. This seems a bit redundant - # but it will make sense when we have multiple exchange portfolios - # feeding into the same performance tracker. - tracker = self.perf_tracker.todays_performance.position_tracker - for asset in exchange.portfolio.positions: - position = exchange.portfolio.positions[asset] + for position in exchange_positions: tracker.update_position( - asset=asset, + asset=position.asset, last_sale_date=position.last_sale_date, last_sale_price=position.last_sale_price ) @@ -467,7 +465,7 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): ) if attempt_index < self.retry_synchronize_portfolio: sleep(self.retry_delay) - self._synchronize_portfolio(attempt_index + 1) + self.update_positions(attempt_index + 1) else: raise ExchangePortfolioDataError( data_type='update-portfolio', @@ -576,7 +574,10 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): order = self.blotter.orders[transaction.order_id] self.perf_tracker.process_order(order) - self.perf_tracker.update_performance() + if len(new_transactions) > 0: + self.perf_tracker.update_performance() + + self.update_positions() if self._handle_data: self._handle_data(self, data) @@ -643,103 +644,21 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): log.warn('unable to save minute perfs to disk: {}'.format(e)) try: - for exchange_name in self.exchanges: - exchange = self.exchanges[exchange_name] - save_algo_object( - algo_name=self.algo_namespace, - key='portfolio_{}'.format(exchange_name), - obj=exchange.portfolio - ) + blotter_params = dict( + open_orders=self.blotter.open_orders, + orders=self.blotter.orders, + new_orders=self.blotter.new_orders, + data_frequency=self.blotter.data_frequency, + current_dt=self.blotter.current_dt, + ) + save_algo_object( + algo_name=self.algo_namespace, + key='blotter', + obj=blotter_params, + ) except Exception as e: log.warn('unable to save portfolio to disk: {}'.format(e)) - def _order(self, - asset, - amount, - limit_price=None, - stop_price=None, - style=None, - attempt_index=0): - try: - exchange = self.exchanges[asset.exchange] - return exchange.order(asset, amount, limit_price, - stop_price, - style) - except ExchangeRequestError as e: - log.warn( - 'order attempt {}: {}'.format(attempt_index, e) - ) - if attempt_index < self.retry_order: - sleep(self.retry_delay) - return self._order( - asset, amount, limit_price, stop_price, style, - attempt_index + 1) - else: - raise ExchangeTransactionError( - transaction_type='order', - attempts=attempt_index, - error=e - ) - - @api_method - @disallowed_in_before_trading_start(OrderInBeforeTradingStart()) - @expect_types(asset=TradingPair) - def order(self, - asset, - amount, - limit_price=None, - stop_price=None, - style=None): - """ - We use the exchange specific portfolio to place orders. - The cumulative portfolio does not contain open orders but exchange - portfolios do. - - Parameters - ---------- - asset: TradingPair - amount: float - limit_price: float - stop_price: float - style: Style - order: Order - The catalyst order object or None - """ - - if self.simulate_orders: - order_id = super(ExchangeTradingAlgorithmLive, self).order( - asset, amount, limit_price, stop_price, style - ) - log.debug('created a simulated order {}'.format(order_id)) - - else: - amount, style = self._calculate_order( - asset, amount, limit_price, stop_price, style - ) - order_id = self._order( - asset, amount, limit_price, stop_price, style - ) - - if order_id is not None: - current_order = None - for order in self.blotter.open_orders[asset]: - if current_order is None and order.id == order_id: - self.perf_tracker.process_order(order) - current_order = order - - if current_order is not None: - return current_order - - else: - raise OrphanOrderError( - order_id=order_id, - exchange=asset.exchange - ) - else: - log.warn('unable to order {} {} on exchange {}'.format( - amount, asset.symbol, asset.exchange)) - return None - @api_method def batch_market_order(self, share_counts): raise NotImplementedError() diff --git a/catalyst/exchange/exchange_blotter.py b/catalyst/exchange/exchange_blotter.py index 72e6d539..79757221 100644 --- a/catalyst/exchange/exchange_blotter.py +++ b/catalyst/exchange/exchange_blotter.py @@ -1,15 +1,18 @@ from time import sleep +import pandas as pd from catalyst.assets._assets import TradingPair from logbook import Logger from catalyst.constants import LOG_LEVEL from catalyst.exchange.exchange_errors import ExchangeRequestError, \ - ExchangePortfolioDataError + ExchangePortfolioDataError, OrphanOrderError, ExchangeTransactionError from catalyst.finance.blotter import Blotter from catalyst.finance.commission import CommissionModel +from catalyst.finance.order import ORDER_STATUS from catalyst.finance.slippage import SlippageModel -from catalyst.finance.transaction import create_transaction +from catalyst.finance.transaction import create_transaction, Transaction +from catalyst.utils.input_validation import expect_types log = Logger('exchange_blotter', level=LOG_LEVEL) @@ -127,6 +130,11 @@ class ExchangeBlotter(Blotter): def __init__(self, *args, **kwargs): self.simulate_orders = kwargs.pop('simulate_orders', False) + self.exchanges = kwargs.pop('exchanges', None) + if not self.exchanges: + raise ValueError('ExchangeBlotter must have an `exchanges` ' + 'attribute.') + super(ExchangeBlotter, self).__init__(*args, **kwargs) # Using the equity models for now @@ -139,22 +147,106 @@ class ExchangeBlotter(Blotter): TradingPair: TradingPairFeeSchedule() } + def exchange_order(self, asset, amount, style=None, attempt_index=0): + try: + exchange = self.exchanges[asset.exchange] + return exchange.order( + asset, amount, style + ) + except ExchangeRequestError as e: + log.warn( + 'order attempt {}: {}'.format(attempt_index, e) + ) + if attempt_index < self.retry_order: + sleep(self.retry_delay) + + return self.exchange_order( + asset, amount, style, attempt_index + 1 + ) + else: + raise ExchangeTransactionError( + transaction_type='order', + attempts=attempt_index, + error=e + ) + + @expect_types(asset=TradingPair) + def order(self, asset, amount, style, order_id=None): + if self.simulate_orders: + return super(ExchangeBlotter, self).order( + asset, amount, style, order_id + ) + + else: + order = self.exchange_order( + asset, amount, style + ) + + self.open_orders[order.asset].append(order) + self.orders[order.id] = order + self.new_orders.append(order) + + return order.id + + def check_open_orders(self): + """ + Loop through the list of open orders in the Portfolio object. + For each executed order found, create a transaction and apply to the + Portfolio. + + Returns + ------- + list[Transaction] + + """ + for asset in self.open_orders: + exchange = self.exchanges[asset.exchange] + + for order in self.open_orders[asset]: + log.debug('found open order: {}'.format(order.id)) + + order, executed_price = exchange.get_order(order.id, asset) + log.debug( + 'got updated order {} {}'.format( + order, executed_price + ) + ) + if order.status == ORDER_STATUS.FILLED: + transaction = Transaction( + asset=order.asset, + amount=order.amount, + dt=pd.Timestamp.utcnow(), + price=executed_price, + order_id=order.id, + commission=order.commission + ) + yield order, transaction + + elif order.status == ORDER_STATUS.CANCELLED: + yield order, None + + else: + delta = pd.Timestamp.utcnow() - order.dt + log.info( + 'order {order_id} still open after {delta}'.format( + order_id=order.id, + delta=delta + ) + ) + def get_exchange_transactions(self, attempt_index=0): closed_orders = [] transactions = [] commissions = [] try: - for exchange_name in self.exchanges: - exchange = self.exchanges[exchange_name] - for order, txn in exchange.check_open_orders(): + for order, txn in self.check_open_orders(): + order.dt = txn.dt - order.dt = txn.dt + transactions.append(txn) - transactions.append(txn) - - if not order.open: - closed_orders.append(order) + if not order.open: + closed_orders.append(order) return transactions, commissions, closed_orders @@ -165,6 +257,7 @@ class ExchangeBlotter(Blotter): if attempt_index < self.retry_check_open_orders: sleep(self.retry_delay) return self.get_exchange_transactions(attempt_index + 1) + else: raise ExchangePortfolioDataError( data_type='order-status', diff --git a/catalyst/exchange/exchange_data_portal.py b/catalyst/exchange/exchange_data_portal.py index 4cfac2b7..f5a10a37 100644 --- a/catalyst/exchange/exchange_data_portal.py +++ b/catalyst/exchange/exchange_data_portal.py @@ -13,7 +13,8 @@ from catalyst.exchange.exchange_errors import ( ExchangeRequestError, ExchangeBarDataError, PricingDataNotLoadedError) -from catalyst.exchange.exchange_utils import get_frequency, resample_history_df +from catalyst.exchange.exchange_utils import get_frequency, \ + resample_history_df, group_assets_by_exchange log = Logger('DataPortalExchange', level=LOG_LEVEL) @@ -38,13 +39,7 @@ class DataPortalExchangeBase(DataPortal): ffill=True, attempt_index=0): try: - exchange_assets = dict() - for asset in assets: - if asset.exchange not in exchange_assets: - exchange_assets[asset.exchange] = list() - - exchange_assets[asset.exchange].append(asset) - + exchange_assets = group_assets_by_exchange(assets) if len(exchange_assets) > 1: df_list = [] for exchange_name in exchange_assets: diff --git a/catalyst/exchange/exchange_utils.py b/catalyst/exchange/exchange_utils.py index 0ae93ea8..e4f4cd7f 100644 --- a/catalyst/exchange/exchange_utils.py +++ b/catalyst/exchange/exchange_utils.py @@ -631,3 +631,14 @@ def from_ms_timestamp(ms): def get_epoch(): return pd.to_datetime('1970-1-1', utc=True) + + +def group_assets_by_exchange(assets): + exchange_assets = dict() + for asset in assets: + if asset.exchange not in exchange_assets: + exchange_assets[asset.exchange] = list() + + exchange_assets[asset.exchange].append(asset) + + return exchange_assets diff --git a/catalyst/exchange/factory.py b/catalyst/exchange/factory.py index 8099b208..b0002f32 100644 --- a/catalyst/exchange/factory.py +++ b/catalyst/exchange/factory.py @@ -6,8 +6,7 @@ from catalyst.exchange.exchange_utils import get_exchange_auth, \ get_exchange_folder -def get_exchange(exchange_name, base_currency=None, portfolio=None, - must_authenticate=False): +def get_exchange(exchange_name, base_currency=None, must_authenticate=False): exchange_auth = get_exchange_auth(exchange_name) has_auth = (exchange_auth['key'] != '' and exchange_auth['secret'] != '') @@ -24,7 +23,6 @@ def get_exchange(exchange_name, base_currency=None, portfolio=None, key=exchange_auth['key'], secret=exchange_auth['secret'], base_currency=base_currency, - portfolio=portfolio ) diff --git a/catalyst/utils/run_algo.py b/catalyst/utils/run_algo.py index a20490bc..f7219330 100644 --- a/catalyst/utils/run_algo.py +++ b/catalyst/utils/run_algo.py @@ -152,22 +152,9 @@ def _run(handle_data, exchanges = dict() for exchange_name in exchange_list: - # Looking for the portfolio from the cache first - portfolio = get_algo_object( - algo_name=algo_namespace, - key='portfolio_{}'.format(exchange_name), - environ=environ - ) - - if portfolio is None: - portfolio = ExchangePortfolio( - start if start is not None else pd.Timestamp.utcnow() - ) - exchanges[exchange_name] = get_exchange( exchange_name=exchange_name, base_currency=base_currency, - portfolio=portfolio, must_authenticate=live, )