BLD: making some adjustment to the blotter to improve paper trading

This commit is contained in:
fredfortier
2017-12-06 18:02:02 -05:00
parent dcdf4f77db
commit e42276affa
9 changed files with 179 additions and 287 deletions
+1 -1
View File
@@ -276,5 +276,5 @@ if __name__ == '__main__':
algo_namespace=NAMESPACE,
base_currency='eth',
live_graph=False,
simulate_orders=True
simulate_orders=False
)
+1 -17
View File
@@ -35,8 +35,7 @@ SUPPORTED_EXCHANGES = dict(
class CCXT(Exchange):
def __init__(self, exchange_name, key, secret, base_currency,
portfolio=None):
def __init__(self, exchange_name, key, secret, base_currency):
log.debug(
'finding {} in CCXT exchanges:\n{}'.format(
exchange_name, ccxt.exchanges
@@ -69,7 +68,6 @@ class CCXT(Exchange):
self.load_assets()
self.base_currency = base_currency
self._portfolio = portfolio
self.transactions = defaultdict(list)
self.num_candles_limit = 2000
@@ -508,18 +506,7 @@ class CCXT(Exchange):
return orders
def _get_asset_from_order(self, order_id):
open_orders = self.portfolio.open_orders
order = next(
(open_orders[id] for id in open_orders if id == order_id),
None
) # type: Order
return order.asset if order is not None else None
def get_order(self, order_id, asset_or_symbol=None):
if asset_or_symbol is None and self.portfolio is not None:
asset_or_symbol = self._get_asset_from_order(order_id)
if asset_or_symbol is None:
log.debug(
'order not found in memory, the request might fail '
@@ -540,9 +527,6 @@ class CCXT(Exchange):
order_id = order_param.id \
if isinstance(order_param, Order) else order_param
if asset_or_symbol is None and self.portfolio is not None:
asset_or_symbol = self._get_asset_from_order(order_id)
if asset_or_symbol is None:
log.debug(
'order not found in memory, cancelling order might fail '
+20 -115
View File
@@ -20,7 +20,6 @@ from catalyst.exchange.exchange_errors import MismatchingBaseCurrencies, \
NoDataAvailableOnExchange, NoValueForField
from catalyst.exchange.exchange_execution import ExchangeStopLimitOrder, \
ExchangeLimitOrder, ExchangeStopOrder
from catalyst.exchange.exchange_portfolio import ExchangePortfolio
from catalyst.exchange.exchange_utils import get_exchange_symbols, \
get_frequency, resample_history_df
from catalyst.finance.order import ORDER_STATUS
@@ -36,7 +35,6 @@ class Exchange:
self.name = None
self.assets = []
self._symbol_maps = [None, None]
self._portfolio = None
self.minute_writer = None
self.minute_reader = None
self.base_currency = None
@@ -46,27 +44,6 @@ class Exchange:
self.request_cpt = None
self.bundle = ExchangeBundle(self.name)
@property
def positions(self):
return self.portfolio.positions
@property
def portfolio(self):
"""
The exchange portfolio
Returns
-------
ExchangePortfolio
"""
if self._portfolio is None:
self._portfolio = ExchangePortfolio(
start_date=pd.Timestamp.utcnow()
)
self.synchronize_portfolio()
return self._portfolio
@abstractproperty
def account(self):
pass
@@ -313,53 +290,6 @@ class Exchange:
"""
pass
def check_open_orders(self):
"""
Loop through the list of open orders in the Portfolio object.
For each executed order found, create a transaction and apply to the
Portfolio.
Returns
-------
list[Transaction]
"""
if self.portfolio.open_orders:
for order_id in list(self.portfolio.open_orders):
log.debug('found open order: {}'.format(order_id))
order, executed_price = self.get_order(order_id)
log.debug(
'got updated order {} {}'.format(
order, executed_price
)
)
if order.status == ORDER_STATUS.FILLED:
transaction = Transaction(
asset=order.asset,
amount=order.amount,
dt=pd.Timestamp.utcnow(),
price=executed_price,
order_id=order.id,
commission=order.commission
)
yield order, transaction
# self.portfolio.execute_order(order, transaction)
elif order.status == ORDER_STATUS.CANCELLED:
# self.portfolio.remove_order(order)
yield order, None
else:
delta = pd.Timestamp.utcnow() - order.dt
log.info(
'order {order_id} still open after {delta}'.format(
order_id=order_id,
delta=delta
)
)
def get_spot_value(self, assets, field, dt=None, data_frequency='minute'):
"""
Public API method that returns a scalar value representing the value
@@ -668,7 +598,7 @@ class Exchange:
return df
def synchronize_portfolio(self):
def calculate_totals(self, positions=None):
"""
Update the portfolio cash and position balances based on the
latest ticker prices.
@@ -677,42 +607,36 @@ class Exchange:
log.debug('synchronizing portfolio with exchange {}'.format(self.name))
balances = self.get_balances()
base_position_available = balances[self.base_currency]['free'] \
cash = balances[self.base_currency]['free'] \
if self.base_currency in balances else None
if base_position_available is None:
if cash is None:
raise BaseCurrencyNotFoundError(
base_currency=self.base_currency,
exchange=self.name.title()
exchange=self.name
)
log.debug('found base currency balance: {}'.format(cash))
portfolio = self._portfolio
portfolio.cash = base_position_available
log.debug('found base currency balance: {}'.format(portfolio.cash))
if portfolio.starting_cash is None:
portfolio.starting_cash = portfolio.cash
if portfolio.positions:
assets = list(portfolio.positions.keys())
positions_value = 0.0
if positions is not None:
assets = set([position.asset for position in positions])
tickers = self.tickers(assets)
log.debug('got tickers for positions: {}'.format(tickers))
portfolio.positions_value = 0.0
for asset in tickers:
# TODO: convert if the position is not in the base currency
ticker = tickers[asset]
position = portfolio.positions[asset]
positions = [p for p in positions if p.asset == asset]
position.last_sale_price = ticker['last_price']
position.last_sale_date = ticker['last_traded']
for position in positions:
position.last_sale_price = ticker['last_price']
position.last_sale_date = ticker['last_traded']
portfolio.positions_value += \
position.amount * position.last_sale_price
portfolio.portfolio_value = \
portfolio.positions_value + portfolio.cash
positions_value += \
position.amount * position.last_sale_price
def order(self, asset, amount, limit_price=None, stop_price=None,
style=None):
return cash, positions_value
def order(self, asset, amount, style):
"""Place an order.
Parameters
@@ -771,22 +695,8 @@ class Exchange:
)
is_buy = (amount > 0)
display_price = style.get_limit_price(is_buy)
if limit_price is not None and stop_price is not None:
style = ExchangeStopLimitOrder(
limit_price, stop_price, exchange=self.name
)
elif limit_price is not None:
style = ExchangeLimitOrder(limit_price, exchange=self.name)
elif stop_price is not None:
style = ExchangeStopOrder(stop_price, exchange=self.name)
else:
style = MarketOrder(exchange=self.name)
display_price = limit_price if limit_price is not None else stop_price
log.debug(
'issuing {side} order of {amount} {symbol} for {type}: {price}'.format(
side='buy' if is_buy else 'sell',
@@ -797,12 +707,7 @@ class Exchange:
)
)
order = self.create_order(asset, amount, is_buy, style)
if order:
self._portfolio.create_order(order)
return order.id
else:
return None
return self.create_order(asset, amount, is_buy, style)
# The methods below must be implemented for each exchange.
@abstractmethod
+39 -120
View File
@@ -21,23 +21,19 @@ from time import sleep
import logbook
import pandas as pd
from catalyst.assets._assets import TradingPair
import catalyst.protocol as zp
from catalyst.algorithm import TradingAlgorithm
from catalyst.constants import LOG_LEVEL
from catalyst.errors import OrderInBeforeTradingStart
from catalyst.exchange.exchange_blotter import ExchangeBlotter
from catalyst.exchange.exchange_errors import (
ExchangeRequestError,
ExchangePortfolioDataError,
ExchangeTransactionError,
OrphanOrderError, OrderTypeNotSupported)
from catalyst.exchange.exchange_execution import ExchangeStopLimitOrder, \
ExchangeLimitOrder, ExchangeStopOrder
OrderTypeNotSupported)
from catalyst.exchange.exchange_execution import ExchangeLimitOrder
from catalyst.exchange.exchange_utils import save_algo_object, get_algo_object, \
get_algo_folder, get_algo_df, \
save_algo_df
save_algo_df, group_assets_by_exchange
from catalyst.exchange.live_graph_clock import LiveGraphClock
from catalyst.exchange.simple_clock import SimpleClock
from catalyst.exchange.stats_utils import get_pretty_stats
@@ -45,10 +41,8 @@ from catalyst.finance.execution import MarketOrder
from catalyst.finance.performance.period import calc_period_stats
from catalyst.gens.tradesimulation import AlgorithmSimulator
from catalyst.utils.api_support import (
api_method,
disallowed_in_before_trading_start)
from catalyst.utils.input_validation import error_keywords, ensure_upper_case, \
expect_types
api_method)
from catalyst.utils.input_validation import error_keywords, ensure_upper_case
from catalyst.utils.math_utils import round_nearest
from catalyst.utils.preprocess import preprocess
@@ -75,7 +69,8 @@ class ExchangeTradingAlgorithmBase(TradingAlgorithm):
data_frequency=self.data_frequency,
# Default to NeverCancel in catalyst
cancel_policy=self.cancel_policy,
simulate_orders=self.simulate_orders
simulate_orders=self.simulate_orders,
exchanges=self.exchanges
)
@staticmethod
@@ -441,22 +436,25 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
def updated_account(self):
return self.perf_tracker.get_account(False)
def _synchronize_portfolio(self, attempt_index=0):
def update_positions(self, attempt_index=0):
tracker = self.perf_tracker.position_tracker
try:
for exchange_name in self.exchanges:
exchange = self.exchanges[exchange_name]
# Position keys correspond to assets
assets = list(tracker.positions)
exchange_assets = group_assets_by_exchange(assets)
for exchange_name in exchange_assets:
assets = exchange_assets[exchange_name]
exchange_positions = \
[tracker.positions[asset] for asset in assets]
exchange.synchronize_portfolio()
exchange = self.exchanges[exchange_name] # Type: Exchange
cash, positions_value = \
exchange.calculate_totals(exchange_positions)
# Applying the updated last_sales_price to the positions
# in the performance tracker. This seems a bit redundant
# but it will make sense when we have multiple exchange portfolios
# feeding into the same performance tracker.
tracker = self.perf_tracker.todays_performance.position_tracker
for asset in exchange.portfolio.positions:
position = exchange.portfolio.positions[asset]
for position in exchange_positions:
tracker.update_position(
asset=asset,
asset=position.asset,
last_sale_date=position.last_sale_date,
last_sale_price=position.last_sale_price
)
@@ -467,7 +465,7 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
)
if attempt_index < self.retry_synchronize_portfolio:
sleep(self.retry_delay)
self._synchronize_portfolio(attempt_index + 1)
self.update_positions(attempt_index + 1)
else:
raise ExchangePortfolioDataError(
data_type='update-portfolio',
@@ -576,7 +574,10 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
order = self.blotter.orders[transaction.order_id]
self.perf_tracker.process_order(order)
self.perf_tracker.update_performance()
if len(new_transactions) > 0:
self.perf_tracker.update_performance()
self.update_positions()
if self._handle_data:
self._handle_data(self, data)
@@ -643,103 +644,21 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
log.warn('unable to save minute perfs to disk: {}'.format(e))
try:
for exchange_name in self.exchanges:
exchange = self.exchanges[exchange_name]
save_algo_object(
algo_name=self.algo_namespace,
key='portfolio_{}'.format(exchange_name),
obj=exchange.portfolio
)
blotter_params = dict(
open_orders=self.blotter.open_orders,
orders=self.blotter.orders,
new_orders=self.blotter.new_orders,
data_frequency=self.blotter.data_frequency,
current_dt=self.blotter.current_dt,
)
save_algo_object(
algo_name=self.algo_namespace,
key='blotter',
obj=blotter_params,
)
except Exception as e:
log.warn('unable to save portfolio to disk: {}'.format(e))
def _order(self,
asset,
amount,
limit_price=None,
stop_price=None,
style=None,
attempt_index=0):
try:
exchange = self.exchanges[asset.exchange]
return exchange.order(asset, amount, limit_price,
stop_price,
style)
except ExchangeRequestError as e:
log.warn(
'order attempt {}: {}'.format(attempt_index, e)
)
if attempt_index < self.retry_order:
sleep(self.retry_delay)
return self._order(
asset, amount, limit_price, stop_price, style,
attempt_index + 1)
else:
raise ExchangeTransactionError(
transaction_type='order',
attempts=attempt_index,
error=e
)
@api_method
@disallowed_in_before_trading_start(OrderInBeforeTradingStart())
@expect_types(asset=TradingPair)
def order(self,
asset,
amount,
limit_price=None,
stop_price=None,
style=None):
"""
We use the exchange specific portfolio to place orders.
The cumulative portfolio does not contain open orders but exchange
portfolios do.
Parameters
----------
asset: TradingPair
amount: float
limit_price: float
stop_price: float
style: Style
order: Order
The catalyst order object or None
"""
if self.simulate_orders:
order_id = super(ExchangeTradingAlgorithmLive, self).order(
asset, amount, limit_price, stop_price, style
)
log.debug('created a simulated order {}'.format(order_id))
else:
amount, style = self._calculate_order(
asset, amount, limit_price, stop_price, style
)
order_id = self._order(
asset, amount, limit_price, stop_price, style
)
if order_id is not None:
current_order = None
for order in self.blotter.open_orders[asset]:
if current_order is None and order.id == order_id:
self.perf_tracker.process_order(order)
current_order = order
if current_order is not None:
return current_order
else:
raise OrphanOrderError(
order_id=order_id,
exchange=asset.exchange
)
else:
log.warn('unable to order {} {} on exchange {}'.format(
amount, asset.symbol, asset.exchange))
return None
@api_method
def batch_market_order(self, share_counts):
raise NotImplementedError()
+103 -10
View File
@@ -1,15 +1,18 @@
from time import sleep
import pandas as pd
from catalyst.assets._assets import TradingPair
from logbook import Logger
from catalyst.constants import LOG_LEVEL
from catalyst.exchange.exchange_errors import ExchangeRequestError, \
ExchangePortfolioDataError
ExchangePortfolioDataError, OrphanOrderError, ExchangeTransactionError
from catalyst.finance.blotter import Blotter
from catalyst.finance.commission import CommissionModel
from catalyst.finance.order import ORDER_STATUS
from catalyst.finance.slippage import SlippageModel
from catalyst.finance.transaction import create_transaction
from catalyst.finance.transaction import create_transaction, Transaction
from catalyst.utils.input_validation import expect_types
log = Logger('exchange_blotter', level=LOG_LEVEL)
@@ -127,6 +130,11 @@ class ExchangeBlotter(Blotter):
def __init__(self, *args, **kwargs):
self.simulate_orders = kwargs.pop('simulate_orders', False)
self.exchanges = kwargs.pop('exchanges', None)
if not self.exchanges:
raise ValueError('ExchangeBlotter must have an `exchanges` '
'attribute.')
super(ExchangeBlotter, self).__init__(*args, **kwargs)
# Using the equity models for now
@@ -139,22 +147,106 @@ class ExchangeBlotter(Blotter):
TradingPair: TradingPairFeeSchedule()
}
def exchange_order(self, asset, amount, style=None, attempt_index=0):
try:
exchange = self.exchanges[asset.exchange]
return exchange.order(
asset, amount, style
)
except ExchangeRequestError as e:
log.warn(
'order attempt {}: {}'.format(attempt_index, e)
)
if attempt_index < self.retry_order:
sleep(self.retry_delay)
return self.exchange_order(
asset, amount, style, attempt_index + 1
)
else:
raise ExchangeTransactionError(
transaction_type='order',
attempts=attempt_index,
error=e
)
@expect_types(asset=TradingPair)
def order(self, asset, amount, style, order_id=None):
if self.simulate_orders:
return super(ExchangeBlotter, self).order(
asset, amount, style, order_id
)
else:
order = self.exchange_order(
asset, amount, style
)
self.open_orders[order.asset].append(order)
self.orders[order.id] = order
self.new_orders.append(order)
return order.id
def check_open_orders(self):
"""
Loop through the list of open orders in the Portfolio object.
For each executed order found, create a transaction and apply to the
Portfolio.
Returns
-------
list[Transaction]
"""
for asset in self.open_orders:
exchange = self.exchanges[asset.exchange]
for order in self.open_orders[asset]:
log.debug('found open order: {}'.format(order.id))
order, executed_price = exchange.get_order(order.id, asset)
log.debug(
'got updated order {} {}'.format(
order, executed_price
)
)
if order.status == ORDER_STATUS.FILLED:
transaction = Transaction(
asset=order.asset,
amount=order.amount,
dt=pd.Timestamp.utcnow(),
price=executed_price,
order_id=order.id,
commission=order.commission
)
yield order, transaction
elif order.status == ORDER_STATUS.CANCELLED:
yield order, None
else:
delta = pd.Timestamp.utcnow() - order.dt
log.info(
'order {order_id} still open after {delta}'.format(
order_id=order.id,
delta=delta
)
)
def get_exchange_transactions(self, attempt_index=0):
closed_orders = []
transactions = []
commissions = []
try:
for exchange_name in self.exchanges:
exchange = self.exchanges[exchange_name]
for order, txn in exchange.check_open_orders():
for order, txn in self.check_open_orders():
order.dt = txn.dt
order.dt = txn.dt
transactions.append(txn)
transactions.append(txn)
if not order.open:
closed_orders.append(order)
if not order.open:
closed_orders.append(order)
return transactions, commissions, closed_orders
@@ -165,6 +257,7 @@ class ExchangeBlotter(Blotter):
if attempt_index < self.retry_check_open_orders:
sleep(self.retry_delay)
return self.get_exchange_transactions(attempt_index + 1)
else:
raise ExchangePortfolioDataError(
data_type='order-status',
+3 -8
View File
@@ -13,7 +13,8 @@ from catalyst.exchange.exchange_errors import (
ExchangeRequestError,
ExchangeBarDataError,
PricingDataNotLoadedError)
from catalyst.exchange.exchange_utils import get_frequency, resample_history_df
from catalyst.exchange.exchange_utils import get_frequency, \
resample_history_df, group_assets_by_exchange
log = Logger('DataPortalExchange', level=LOG_LEVEL)
@@ -38,13 +39,7 @@ class DataPortalExchangeBase(DataPortal):
ffill=True,
attempt_index=0):
try:
exchange_assets = dict()
for asset in assets:
if asset.exchange not in exchange_assets:
exchange_assets[asset.exchange] = list()
exchange_assets[asset.exchange].append(asset)
exchange_assets = group_assets_by_exchange(assets)
if len(exchange_assets) > 1:
df_list = []
for exchange_name in exchange_assets:
+11
View File
@@ -631,3 +631,14 @@ def from_ms_timestamp(ms):
def get_epoch():
return pd.to_datetime('1970-1-1', utc=True)
def group_assets_by_exchange(assets):
exchange_assets = dict()
for asset in assets:
if asset.exchange not in exchange_assets:
exchange_assets[asset.exchange] = list()
exchange_assets[asset.exchange].append(asset)
return exchange_assets
+1 -3
View File
@@ -6,8 +6,7 @@ from catalyst.exchange.exchange_utils import get_exchange_auth, \
get_exchange_folder
def get_exchange(exchange_name, base_currency=None, portfolio=None,
must_authenticate=False):
def get_exchange(exchange_name, base_currency=None, must_authenticate=False):
exchange_auth = get_exchange_auth(exchange_name)
has_auth = (exchange_auth['key'] != '' and exchange_auth['secret'] != '')
@@ -24,7 +23,6 @@ def get_exchange(exchange_name, base_currency=None, portfolio=None,
key=exchange_auth['key'],
secret=exchange_auth['secret'],
base_currency=base_currency,
portfolio=portfolio
)
-13
View File
@@ -152,22 +152,9 @@ def _run(handle_data,
exchanges = dict()
for exchange_name in exchange_list:
# Looking for the portfolio from the cache first
portfolio = get_algo_object(
algo_name=algo_namespace,
key='portfolio_{}'.format(exchange_name),
environ=environ
)
if portfolio is None:
portfolio = ExchangePortfolio(
start if start is not None else pd.Timestamp.utcnow()
)
exchanges[exchange_name] = get_exchange(
exchange_name=exchange_name,
base_currency=base_currency,
portfolio=portfolio,
must_authenticate=live,
)