mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-01 05:22:49 +08:00
Polishing error handling
This commit is contained in:
@@ -7,7 +7,7 @@ from catalyst.api import (
|
||||
record,
|
||||
get_open_orders,
|
||||
)
|
||||
from catalyst.exchange.exchange_errors import ExchangeRequestError
|
||||
from catalyst.errors import ZiplineError
|
||||
import matplotlib.pyplot as plt
|
||||
import pyfolio as pf
|
||||
|
||||
@@ -28,9 +28,10 @@ def initialize(context):
|
||||
context.retry_update_portfolio = 2
|
||||
context.retry_order = 2
|
||||
|
||||
context.errors = []
|
||||
|
||||
def handle_data(context, data):
|
||||
log.info('handling bar {}'.format(data.current_dt))
|
||||
|
||||
def _handle_data(context, data):
|
||||
# price_history = data.history(symbol('iot_usd'),
|
||||
# fields='price',
|
||||
# bar_count=20,
|
||||
@@ -114,7 +115,22 @@ def handle_data(context, data):
|
||||
leverage=context.account.leverage,
|
||||
)
|
||||
|
||||
pass
|
||||
|
||||
def handle_data(context, data):
|
||||
log.info('handling bar {}'.format(data.current_dt))
|
||||
try:
|
||||
_handle_data(context, data)
|
||||
except ZiplineError as e:
|
||||
log.warn('aborting the bar on error {}'.format(e))
|
||||
context.errors.append(e)
|
||||
|
||||
log.info('completed bar {}, total execution errors {}'.format(
|
||||
data.current_dt,
|
||||
len(context.errors)
|
||||
))
|
||||
|
||||
if len(context.errors) > 0:
|
||||
log.info('the errors:\n{}'.format(context.errors))
|
||||
|
||||
|
||||
def analyze(context, stats):
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from datetime import time
|
||||
from datetime import time, timedelta
|
||||
from time import sleep
|
||||
import logbook
|
||||
import signal
|
||||
@@ -30,6 +30,8 @@ from catalyst.utils.api_support import (
|
||||
from catalyst.utils.calendars.trading_calendar import days_at_time
|
||||
from catalyst.exchange.exchange_errors import (
|
||||
ExchangeRequestError,
|
||||
ExchangePortfolioDataError,
|
||||
ExchangeTransactionError
|
||||
)
|
||||
from catalyst.finance.performance.period import calc_period_stats
|
||||
|
||||
@@ -51,7 +53,7 @@ class ExchangeTradingAlgorithm(TradingAlgorithm):
|
||||
self.retry_check_open_orders = 5
|
||||
self.retry_update_portfolio = 5
|
||||
self.retry_get_open_orders = 5
|
||||
self.retry_order = 1
|
||||
self.retry_order = 2
|
||||
self.retry_delay = 5
|
||||
|
||||
super(self.__class__, self).__init__(*args, **kwargs)
|
||||
@@ -158,6 +160,12 @@ class ExchangeTradingAlgorithm(TradingAlgorithm):
|
||||
if attempt_index < self.retry_update_portfolio:
|
||||
sleep(self.retry_delay)
|
||||
self._update_portfolio(attempt_index + 1)
|
||||
else:
|
||||
raise ExchangePortfolioDataError(
|
||||
data_type='update-portfolio',
|
||||
attempts=attempt_index,
|
||||
error=e
|
||||
)
|
||||
|
||||
def _check_open_orders(self, attempt_index=0):
|
||||
try:
|
||||
@@ -170,9 +178,13 @@ class ExchangeTradingAlgorithm(TradingAlgorithm):
|
||||
sleep(self.retry_delay)
|
||||
return self._check_open_orders(attempt_index + 1)
|
||||
else:
|
||||
return list()
|
||||
raise ExchangePortfolioDataError(
|
||||
data_type='order-status',
|
||||
attempts=attempt_index,
|
||||
error=e
|
||||
)
|
||||
|
||||
def prepare_period_stats(self, bar_date):
|
||||
def prepare_period_stats(self, start_dt, end_dt):
|
||||
"""
|
||||
Creates a dictionary representing the state of the tracker.
|
||||
|
||||
@@ -220,15 +232,17 @@ class ExchangeTradingAlgorithm(TradingAlgorithm):
|
||||
|
||||
# we want the key to be absent, not just empty
|
||||
# Only include transactions for given dt
|
||||
stats['transactions'] = list(filter(
|
||||
lambda date:
|
||||
period.processed_transactions[date] if date == bar_date else None,
|
||||
period.processed_transactions))
|
||||
stats['transactions'] = dict()
|
||||
for date in period.processed_transactions:
|
||||
if start_dt <= date < end_dt:
|
||||
stats['transactions'][date] = \
|
||||
period.processed_transactions[date]
|
||||
|
||||
stats['orders'] = list(filter(
|
||||
lambda date:
|
||||
period.orders_by_modified if date == bar_date else None,
|
||||
period.orders_by_modified))
|
||||
stats['orders'] = dict()
|
||||
for date in period.orders_by_modified:
|
||||
if start_dt <= date < end_dt:
|
||||
stats['orders'][date] = \
|
||||
period.orders_by_modified[date]
|
||||
|
||||
return stats
|
||||
|
||||
@@ -255,10 +269,11 @@ class ExchangeTradingAlgorithm(TradingAlgorithm):
|
||||
# Performance tracker and keep only minute and cumulative
|
||||
self.perf_tracker.update_performance()
|
||||
|
||||
stats = self.prepare_period_stats(data.current_dt)
|
||||
log.debug('the minute performance:\n{}'.format(stats))
|
||||
minute_stats = self.prepare_period_stats(
|
||||
data.current_dt, data.current_dt + timedelta(minutes=1))
|
||||
log.debug('the minute performance:\n{}'.format(minute_stats))
|
||||
|
||||
self.minute_perfs.append(stats)
|
||||
self.minute_perfs.append(minute_stats)
|
||||
|
||||
except Exception as e:
|
||||
log.warn('unable to calculate performance: {}'.format(e))
|
||||
@@ -284,7 +299,11 @@ class ExchangeTradingAlgorithm(TradingAlgorithm):
|
||||
asset, amount, limit_price, stop_price, style,
|
||||
attempt_index + 1)
|
||||
else:
|
||||
return None
|
||||
raise ExchangeTransactionError(
|
||||
transaction_type='order',
|
||||
attempts=attempt_index,
|
||||
error=e
|
||||
)
|
||||
|
||||
@api_method
|
||||
@disallowed_in_before_trading_start(OrderInBeforeTradingStart())
|
||||
@@ -319,7 +338,11 @@ class ExchangeTradingAlgorithm(TradingAlgorithm):
|
||||
sleep(self.retry_delay)
|
||||
return self._get_open_orders(asset, attempt_index + 1)
|
||||
else:
|
||||
return []
|
||||
raise ExchangePortfolioDataError(
|
||||
data_type='open-orders',
|
||||
attempts=attempt_index,
|
||||
error=e
|
||||
)
|
||||
|
||||
@error_keywords(sid='Keyword argument `sid` is no longer supported for '
|
||||
'get_open_orders. Use `asset` instead.')
|
||||
|
||||
@@ -158,7 +158,6 @@ class Bitfinex(Exchange):
|
||||
# TODO: zipline likes rounded dates to match statistics, is this ok?
|
||||
date = pd.Timestamp.utcfromtimestamp(float(order_status['timestamp']))
|
||||
date = pytz.utc.localize(date)
|
||||
date = date.floor('1 min')
|
||||
order = ExchangeOrder(
|
||||
dt=date,
|
||||
asset=self.assets[order_status['symbol']],
|
||||
@@ -454,7 +453,7 @@ class Bitfinex(Exchange):
|
||||
sell_price_oco=0
|
||||
)
|
||||
|
||||
date = pd.Timestamp.utcnow().floor('1 min') # Making zipline happy
|
||||
date = pd.Timestamp.utcnow()
|
||||
try:
|
||||
response = self._request('order/new', req)
|
||||
exchange_order = response.json()
|
||||
@@ -603,7 +602,9 @@ class Bitfinex(Exchange):
|
||||
formatted_tickers = []
|
||||
for index, ticker in enumerate(tickers):
|
||||
if not len(ticker) == 11:
|
||||
raise ValueError('Invalid ticker: %s' % ticker)
|
||||
raise ExchangeRequestError(
|
||||
error='Invalid ticker in response: {}'.format(ticker)
|
||||
)
|
||||
|
||||
tick = dict(
|
||||
asset=assets[index],
|
||||
|
||||
@@ -13,9 +13,14 @@
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from time import sleep
|
||||
from catalyst.data.data_portal import DataPortal
|
||||
|
||||
from logbook import Logger
|
||||
from catalyst.exchange.exchange_errors import (
|
||||
ExchangeRequestError,
|
||||
ExchangeBarDataError
|
||||
)
|
||||
|
||||
log = Logger('DataPortalExchange')
|
||||
|
||||
@@ -23,8 +28,53 @@ log = Logger('DataPortalExchange')
|
||||
class DataPortalExchange(DataPortal):
|
||||
def __init__(self, exchange, *args, **kwargs):
|
||||
self.exchange = exchange
|
||||
|
||||
# TODO: put somewhere accessible by each algo
|
||||
self.retry_get_history_window = 5
|
||||
self.retry_get_spot_value = 5
|
||||
self.retry_delay = 5
|
||||
|
||||
super(DataPortalExchange, self).__init__(*args, **kwargs)
|
||||
|
||||
def _get_history_window(self,
|
||||
assets,
|
||||
end_dt,
|
||||
bar_count,
|
||||
frequency,
|
||||
field,
|
||||
data_frequency,
|
||||
ffill=True,
|
||||
attempt_index=0):
|
||||
try:
|
||||
return self.exchange.get_history_window(
|
||||
assets,
|
||||
end_dt,
|
||||
bar_count,
|
||||
frequency,
|
||||
field,
|
||||
data_frequency,
|
||||
ffill)
|
||||
except ExchangeRequestError as e:
|
||||
log.warn(
|
||||
'get history attempt {}: {}'.format(attempt_index, e)
|
||||
)
|
||||
if attempt_index < self.retry_get_history_window:
|
||||
sleep(self.retry_delay)
|
||||
return self._get_history_window(assets,
|
||||
end_dt,
|
||||
bar_count,
|
||||
frequency,
|
||||
field,
|
||||
data_frequency,
|
||||
ffill,
|
||||
attempt_index + 1)
|
||||
else:
|
||||
raise ExchangeBarDataError(
|
||||
data_type='history',
|
||||
attempts=attempt_index,
|
||||
error=e
|
||||
)
|
||||
|
||||
def get_history_window(self,
|
||||
assets,
|
||||
end_dt,
|
||||
@@ -33,17 +83,36 @@ class DataPortalExchange(DataPortal):
|
||||
field,
|
||||
data_frequency,
|
||||
ffill=True):
|
||||
return self.exchange.get_history_window(
|
||||
assets,
|
||||
end_dt,
|
||||
bar_count,
|
||||
frequency,
|
||||
field,
|
||||
data_frequency,
|
||||
ffill)
|
||||
return self._get_history_window(assets,
|
||||
end_dt,
|
||||
bar_count,
|
||||
frequency,
|
||||
field,
|
||||
data_frequency,
|
||||
ffill)
|
||||
|
||||
def _get_spot_value(self, assets, field, dt, data_frequency,
|
||||
attempt_index=0):
|
||||
try:
|
||||
return self.exchange.get_spot_value(assets, field, dt,
|
||||
data_frequency)
|
||||
except ExchangeRequestError as e:
|
||||
log.warn(
|
||||
'get spot value attempt {}: {}'.format(attempt_index, e)
|
||||
)
|
||||
if attempt_index < self.retry_get_spot_value:
|
||||
sleep(self.retry_delay)
|
||||
return self._get_spot_value(assets, field, dt, data_frequency,
|
||||
attempt_index + 1)
|
||||
else:
|
||||
raise ExchangeBarDataError(
|
||||
data_type='spot',
|
||||
attempts=attempt_index,
|
||||
error=e
|
||||
)
|
||||
|
||||
def get_spot_value(self, assets, field, dt, data_frequency):
|
||||
return self.exchange.get_spot_value(assets, field, dt, data_frequency)
|
||||
return self._get_spot_value(assets, field, dt, data_frequency)
|
||||
|
||||
def get_adjusted_value(self, asset, field, dt,
|
||||
perspective_dt,
|
||||
|
||||
@@ -101,7 +101,7 @@ class Exchange:
|
||||
transaction = Transaction(
|
||||
asset=order.asset,
|
||||
amount=order.amount,
|
||||
dt=pd.Timestamp.utcnow().floor('1 min'),
|
||||
dt=pd.Timestamp.utcnow(),
|
||||
price=order.executed_price,
|
||||
order_id=order.id,
|
||||
commission=order.commission
|
||||
|
||||
@@ -13,6 +13,27 @@ class ExchangeRequestErrorTooManyAttempts(ZiplineError):
|
||||
).strip()
|
||||
|
||||
|
||||
class ExchangeBarDataError(ZiplineError):
|
||||
msg = (
|
||||
'Unable to retrieve bar data: {data_type}, ' +
|
||||
'giving up after {attempts} attempts: {error}'
|
||||
).strip()
|
||||
|
||||
|
||||
class ExchangePortfolioDataError(ZiplineError):
|
||||
msg = (
|
||||
'Unable to retrieve portfolio data: {data_type}, ' +
|
||||
'giving up after {attempts} attempts: {error}'
|
||||
).strip()
|
||||
|
||||
|
||||
class ExchangeTransactionError(ZiplineError):
|
||||
msg = (
|
||||
'Unable to execute transaction: {transaction_type}, ' +
|
||||
'giving up after {attempts} attempts: {error}'
|
||||
).strip()
|
||||
|
||||
|
||||
class InvalidHistoryFrequencyError(ZiplineError):
|
||||
msg = (
|
||||
'History frequency {frequency} not supported by the exchange.'
|
||||
|
||||
@@ -89,6 +89,7 @@ class ExchangePortfolio(Portfolio):
|
||||
self.capital_used += order.amount * order.executed_price
|
||||
|
||||
if order_position.cost_basis > 0:
|
||||
# TODO: consider buy orders only
|
||||
order_position.cost_basis = np.average(
|
||||
[order_position.cost_basis, order.executed_price],
|
||||
weights=[order_position.amount, order.amount]
|
||||
|
||||
Reference in New Issue
Block a user