From ef403390588bc8c1877b151ee8364ffb314fc4c3 Mon Sep 17 00:00:00 2001 From: Frederic Fortier Date: Sun, 20 Aug 2017 14:04:37 -0400 Subject: [PATCH] Polishing error handling --- catalyst/examples/buy_the_dip_live.py | 24 +++++-- catalyst/exchange/algorithm_exchange.py | 57 ++++++++++----- catalyst/exchange/bitfinex.py | 7 +- catalyst/exchange/data_portal_exchange.py | 87 ++++++++++++++++++++--- catalyst/exchange/exchange.py | 2 +- catalyst/exchange/exchange_errors.py | 21 ++++++ catalyst/exchange/exchange_portfolio.py | 1 + 7 files changed, 165 insertions(+), 34 deletions(-) diff --git a/catalyst/examples/buy_the_dip_live.py b/catalyst/examples/buy_the_dip_live.py index 8c71e0dc..a769df27 100644 --- a/catalyst/examples/buy_the_dip_live.py +++ b/catalyst/examples/buy_the_dip_live.py @@ -7,7 +7,7 @@ from catalyst.api import ( record, get_open_orders, ) -from catalyst.exchange.exchange_errors import ExchangeRequestError +from catalyst.errors import ZiplineError import matplotlib.pyplot as plt import pyfolio as pf @@ -28,9 +28,10 @@ def initialize(context): context.retry_update_portfolio = 2 context.retry_order = 2 + context.errors = [] -def handle_data(context, data): - log.info('handling bar {}'.format(data.current_dt)) + +def _handle_data(context, data): # price_history = data.history(symbol('iot_usd'), # fields='price', # bar_count=20, @@ -114,7 +115,22 @@ def handle_data(context, data): leverage=context.account.leverage, ) - pass + +def handle_data(context, data): + log.info('handling bar {}'.format(data.current_dt)) + try: + _handle_data(context, data) + except ZiplineError as e: + log.warn('aborting the bar on error {}'.format(e)) + context.errors.append(e) + + log.info('completed bar {}, total execution errors {}'.format( + data.current_dt, + len(context.errors) + )) + + if len(context.errors) > 0: + log.info('the errors:\n{}'.format(context.errors)) def analyze(context, stats): diff --git a/catalyst/exchange/algorithm_exchange.py b/catalyst/exchange/algorithm_exchange.py index 4d52d0cb..ed9f5b0d 100644 --- a/catalyst/exchange/algorithm_exchange.py +++ b/catalyst/exchange/algorithm_exchange.py @@ -10,7 +10,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from datetime import time +from datetime import time, timedelta from time import sleep import logbook import signal @@ -30,6 +30,8 @@ from catalyst.utils.api_support import ( from catalyst.utils.calendars.trading_calendar import days_at_time from catalyst.exchange.exchange_errors import ( ExchangeRequestError, + ExchangePortfolioDataError, + ExchangeTransactionError ) from catalyst.finance.performance.period import calc_period_stats @@ -51,7 +53,7 @@ class ExchangeTradingAlgorithm(TradingAlgorithm): self.retry_check_open_orders = 5 self.retry_update_portfolio = 5 self.retry_get_open_orders = 5 - self.retry_order = 1 + self.retry_order = 2 self.retry_delay = 5 super(self.__class__, self).__init__(*args, **kwargs) @@ -158,6 +160,12 @@ class ExchangeTradingAlgorithm(TradingAlgorithm): if attempt_index < self.retry_update_portfolio: sleep(self.retry_delay) self._update_portfolio(attempt_index + 1) + else: + raise ExchangePortfolioDataError( + data_type='update-portfolio', + attempts=attempt_index, + error=e + ) def _check_open_orders(self, attempt_index=0): try: @@ -170,9 +178,13 @@ class ExchangeTradingAlgorithm(TradingAlgorithm): sleep(self.retry_delay) return self._check_open_orders(attempt_index + 1) else: - return list() + raise ExchangePortfolioDataError( + data_type='order-status', + attempts=attempt_index, + error=e + ) - def prepare_period_stats(self, bar_date): + def prepare_period_stats(self, start_dt, end_dt): """ Creates a dictionary representing the state of the tracker. @@ -220,15 +232,17 @@ class ExchangeTradingAlgorithm(TradingAlgorithm): # we want the key to be absent, not just empty # Only include transactions for given dt - stats['transactions'] = list(filter( - lambda date: - period.processed_transactions[date] if date == bar_date else None, - period.processed_transactions)) + stats['transactions'] = dict() + for date in period.processed_transactions: + if start_dt <= date < end_dt: + stats['transactions'][date] = \ + period.processed_transactions[date] - stats['orders'] = list(filter( - lambda date: - period.orders_by_modified if date == bar_date else None, - period.orders_by_modified)) + stats['orders'] = dict() + for date in period.orders_by_modified: + if start_dt <= date < end_dt: + stats['orders'][date] = \ + period.orders_by_modified[date] return stats @@ -255,10 +269,11 @@ class ExchangeTradingAlgorithm(TradingAlgorithm): # Performance tracker and keep only minute and cumulative self.perf_tracker.update_performance() - stats = self.prepare_period_stats(data.current_dt) - log.debug('the minute performance:\n{}'.format(stats)) + minute_stats = self.prepare_period_stats( + data.current_dt, data.current_dt + timedelta(minutes=1)) + log.debug('the minute performance:\n{}'.format(minute_stats)) - self.minute_perfs.append(stats) + self.minute_perfs.append(minute_stats) except Exception as e: log.warn('unable to calculate performance: {}'.format(e)) @@ -284,7 +299,11 @@ class ExchangeTradingAlgorithm(TradingAlgorithm): asset, amount, limit_price, stop_price, style, attempt_index + 1) else: - return None + raise ExchangeTransactionError( + transaction_type='order', + attempts=attempt_index, + error=e + ) @api_method @disallowed_in_before_trading_start(OrderInBeforeTradingStart()) @@ -319,7 +338,11 @@ class ExchangeTradingAlgorithm(TradingAlgorithm): sleep(self.retry_delay) return self._get_open_orders(asset, attempt_index + 1) else: - return [] + raise ExchangePortfolioDataError( + data_type='open-orders', + attempts=attempt_index, + error=e + ) @error_keywords(sid='Keyword argument `sid` is no longer supported for ' 'get_open_orders. Use `asset` instead.') diff --git a/catalyst/exchange/bitfinex.py b/catalyst/exchange/bitfinex.py index 2b6b885f..ffcd888e 100644 --- a/catalyst/exchange/bitfinex.py +++ b/catalyst/exchange/bitfinex.py @@ -158,7 +158,6 @@ class Bitfinex(Exchange): # TODO: zipline likes rounded dates to match statistics, is this ok? date = pd.Timestamp.utcfromtimestamp(float(order_status['timestamp'])) date = pytz.utc.localize(date) - date = date.floor('1 min') order = ExchangeOrder( dt=date, asset=self.assets[order_status['symbol']], @@ -454,7 +453,7 @@ class Bitfinex(Exchange): sell_price_oco=0 ) - date = pd.Timestamp.utcnow().floor('1 min') # Making zipline happy + date = pd.Timestamp.utcnow() try: response = self._request('order/new', req) exchange_order = response.json() @@ -603,7 +602,9 @@ class Bitfinex(Exchange): formatted_tickers = [] for index, ticker in enumerate(tickers): if not len(ticker) == 11: - raise ValueError('Invalid ticker: %s' % ticker) + raise ExchangeRequestError( + error='Invalid ticker in response: {}'.format(ticker) + ) tick = dict( asset=assets[index], diff --git a/catalyst/exchange/data_portal_exchange.py b/catalyst/exchange/data_portal_exchange.py index 74efc073..a747123f 100644 --- a/catalyst/exchange/data_portal_exchange.py +++ b/catalyst/exchange/data_portal_exchange.py @@ -13,9 +13,14 @@ import pandas as pd +from time import sleep from catalyst.data.data_portal import DataPortal from logbook import Logger +from catalyst.exchange.exchange_errors import ( + ExchangeRequestError, + ExchangeBarDataError +) log = Logger('DataPortalExchange') @@ -23,8 +28,53 @@ log = Logger('DataPortalExchange') class DataPortalExchange(DataPortal): def __init__(self, exchange, *args, **kwargs): self.exchange = exchange + + # TODO: put somewhere accessible by each algo + self.retry_get_history_window = 5 + self.retry_get_spot_value = 5 + self.retry_delay = 5 + super(DataPortalExchange, self).__init__(*args, **kwargs) + def _get_history_window(self, + assets, + end_dt, + bar_count, + frequency, + field, + data_frequency, + ffill=True, + attempt_index=0): + try: + return self.exchange.get_history_window( + assets, + end_dt, + bar_count, + frequency, + field, + data_frequency, + ffill) + except ExchangeRequestError as e: + log.warn( + 'get history attempt {}: {}'.format(attempt_index, e) + ) + if attempt_index < self.retry_get_history_window: + sleep(self.retry_delay) + return self._get_history_window(assets, + end_dt, + bar_count, + frequency, + field, + data_frequency, + ffill, + attempt_index + 1) + else: + raise ExchangeBarDataError( + data_type='history', + attempts=attempt_index, + error=e + ) + def get_history_window(self, assets, end_dt, @@ -33,17 +83,36 @@ class DataPortalExchange(DataPortal): field, data_frequency, ffill=True): - return self.exchange.get_history_window( - assets, - end_dt, - bar_count, - frequency, - field, - data_frequency, - ffill) + return self._get_history_window(assets, + end_dt, + bar_count, + frequency, + field, + data_frequency, + ffill) + + def _get_spot_value(self, assets, field, dt, data_frequency, + attempt_index=0): + try: + return self.exchange.get_spot_value(assets, field, dt, + data_frequency) + except ExchangeRequestError as e: + log.warn( + 'get spot value attempt {}: {}'.format(attempt_index, e) + ) + if attempt_index < self.retry_get_spot_value: + sleep(self.retry_delay) + return self._get_spot_value(assets, field, dt, data_frequency, + attempt_index + 1) + else: + raise ExchangeBarDataError( + data_type='spot', + attempts=attempt_index, + error=e + ) def get_spot_value(self, assets, field, dt, data_frequency): - return self.exchange.get_spot_value(assets, field, dt, data_frequency) + return self._get_spot_value(assets, field, dt, data_frequency) def get_adjusted_value(self, asset, field, dt, perspective_dt, diff --git a/catalyst/exchange/exchange.py b/catalyst/exchange/exchange.py index 6c2f5ea2..1c41c312 100644 --- a/catalyst/exchange/exchange.py +++ b/catalyst/exchange/exchange.py @@ -101,7 +101,7 @@ class Exchange: transaction = Transaction( asset=order.asset, amount=order.amount, - dt=pd.Timestamp.utcnow().floor('1 min'), + dt=pd.Timestamp.utcnow(), price=order.executed_price, order_id=order.id, commission=order.commission diff --git a/catalyst/exchange/exchange_errors.py b/catalyst/exchange/exchange_errors.py index 42d960f8..e9cf44f7 100644 --- a/catalyst/exchange/exchange_errors.py +++ b/catalyst/exchange/exchange_errors.py @@ -13,6 +13,27 @@ class ExchangeRequestErrorTooManyAttempts(ZiplineError): ).strip() +class ExchangeBarDataError(ZiplineError): + msg = ( + 'Unable to retrieve bar data: {data_type}, ' + + 'giving up after {attempts} attempts: {error}' + ).strip() + + +class ExchangePortfolioDataError(ZiplineError): + msg = ( + 'Unable to retrieve portfolio data: {data_type}, ' + + 'giving up after {attempts} attempts: {error}' + ).strip() + + +class ExchangeTransactionError(ZiplineError): + msg = ( + 'Unable to execute transaction: {transaction_type}, ' + + 'giving up after {attempts} attempts: {error}' + ).strip() + + class InvalidHistoryFrequencyError(ZiplineError): msg = ( 'History frequency {frequency} not supported by the exchange.' diff --git a/catalyst/exchange/exchange_portfolio.py b/catalyst/exchange/exchange_portfolio.py index fa7ce0b8..192a7ebb 100644 --- a/catalyst/exchange/exchange_portfolio.py +++ b/catalyst/exchange/exchange_portfolio.py @@ -89,6 +89,7 @@ class ExchangePortfolio(Portfolio): self.capital_used += order.amount * order.executed_price if order_position.cost_basis > 0: + # TODO: consider buy orders only order_position.cost_basis = np.average( [order_position.cost_basis, order.executed_price], weights=[order_position.amount, order.amount]