From ddecd6bb48cc114adc8a38ff98aa0a6cd6730257 Mon Sep 17 00:00:00 2001 From: fredfortier Date: Thu, 21 Sep 2017 19:05:16 -0400 Subject: [PATCH] First working version with the backtest and live modes executing the same algorithm. --- .../buy_low_sell_high_neo_with_interface.py | 42 +++--- catalyst/exchange/bitfinex/bitfinex.py | 2 +- catalyst/exchange/data_portal_exchange.py | 42 ++++-- catalyst/exchange/exchange_algorithm.py | 40 +++++ catalyst/exchange/exchange_blotter.py | 141 +++++++++++++++++- catalyst/exchange/exchange_bundle.py | 1 + tests/exchange/test_bundle.py | 7 +- 7 files changed, 232 insertions(+), 43 deletions(-) diff --git a/catalyst/examples/buy_low_sell_high_neo_with_interface.py b/catalyst/examples/buy_low_sell_high_neo_with_interface.py index c453786b..4fc81028 100644 --- a/catalyst/examples/buy_low_sell_high_neo_with_interface.py +++ b/catalyst/examples/buy_low_sell_high_neo_with_interface.py @@ -33,24 +33,25 @@ def initialize(context): def _handle_data(context, data): - prices = data.history( - context.asset, - fields='price', - bar_count=20, - frequency='30m' - ) - rsi = talib.RSI(prices.values, timeperiod=14)[-1] - log.info('got rsi: {}'.format(rsi)) + # prices = data.history( + # context.asset, + # fields='price', + # bar_count=20, + # frequency='30m' + # ) + # rsi = talib.RSI(prices.values, timeperiod=14)[-1] + # log.info('got rsi: {}'.format(rsi)) # Buying more when RSI is low, this should lower our cost basis - if rsi <= 30: - buy_increment = 1 - elif rsi <= 40: - buy_increment = 0.5 - elif rsi <= 70: - buy_increment = 0.1 - else: - buy_increment = None + # if rsi <= 30: + # buy_increment = 1 + # elif rsi <= 40: + # buy_increment = 0.5 + # elif rsi <= 70: + # buy_increment = 0.1 + # else: + # buy_increment = None + buy_increment = 0.1 cash = context.portfolio.cash log.info('base currency available: {cash}'.format(cash=cash)) @@ -62,10 +63,10 @@ def _handle_data(context, data): log.warn('no pricing data') return - record(price=price, rsi=rsi) + record(price=price) orders = get_open_orders(context.asset) - if orders: + if len(orders) > 0: log.info('skipping bar until all open orders execute') return @@ -104,7 +105,6 @@ def _handle_data(context, data): if is_buy: if buy_increment is None: - log.info('the rsi is too high to consider buying {}'.format(rsi)) return if price * buy_increment > cash: @@ -117,11 +117,13 @@ def _handle_data(context, data): cost_basis ) ) + limit_price = price * (1 + context.SLIPPAGE_ALLOWED) order( asset=context.asset, amount=buy_increment, - limit_price=price * (1 + context.SLIPPAGE_ALLOWED) + limit_price=limit_price ) + pass def handle_data(context, data): diff --git a/catalyst/exchange/bitfinex/bitfinex.py b/catalyst/exchange/bitfinex/bitfinex.py index 20c3f312..466e3e50 100644 --- a/catalyst/exchange/bitfinex/bitfinex.py +++ b/catalyst/exchange/bitfinex/bitfinex.py @@ -448,7 +448,7 @@ class Bitfinex(Exchange): order_statuses['message']) ) - orders = list() + orders = [] for order_status in order_statuses: order, executed_price = self._create_order(order_status) if asset is None or asset == order.sid: diff --git a/catalyst/exchange/data_portal_exchange.py b/catalyst/exchange/data_portal_exchange.py index 3968370d..e7c35a1b 100644 --- a/catalyst/exchange/data_portal_exchange.py +++ b/catalyst/exchange/data_portal_exchange.py @@ -15,13 +15,12 @@ import abc import os from time import sleep -import collections import pandas as pd from catalyst.assets._assets import TradingPair from logbook import Logger -from catalyst.data.bundles.core import load, from_bundle_ingest_dirname, \ - BundleData, minute_path, five_minute_path, daily_path +from catalyst.data.bundles.core import from_bundle_ingest_dirname, \ + minute_path, five_minute_path, daily_path from catalyst.data.data_portal import DataPortal from catalyst.data.five_minute_bars import BcolzFiveMinuteBarReader from catalyst.data.minute_bars import BcolzMinuteBarReader @@ -200,6 +199,9 @@ class DataPortalExchangeBase(DataPortal): ) def get_spot_value(self, assets, field, dt, data_frequency): + if field == 'price': + field = 'close' + return self._get_spot_value(assets, field, dt, data_frequency) @abc.abstractmethod @@ -263,20 +265,29 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase): if time_folder is None: raise BundleNotFoundError(exchange=exchange_name) - self.daily_bar_readers[exchange_name] = \ - BcolzDailyBarReader( - daily_path(name, time_folder), - ) + try: + self.daily_bar_readers[exchange_name] = \ + BcolzDailyBarReader( + daily_path(name, time_folder), + ) + except IOError: + self.daily_bar_readers[exchange_name] = None - self.five_minute_bar_readers[exchange_name] = \ - BcolzFiveMinuteBarReader( - five_minute_path(name, time_folder), - ) + try: + self.five_minute_bar_readers[exchange_name] = \ + BcolzFiveMinuteBarReader( + five_minute_path(name, time_folder), + ) + except IOError: + self.five_minute_bar_readers[exchange_name] = None - self.minute_bar_readers[exchange_name] = \ - BcolzMinuteBarReader( - minute_path(name, time_folder), - ) + try: + self.minute_bar_readers[exchange_name] = \ + BcolzMinuteBarReader( + minute_path(name, time_folder), + ) + except IOError: + self.minute_bar_readers[exchange_name] = None @staticmethod def find_most_recent_time(bundle_name): @@ -309,6 +320,7 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase): field, data_frequency, ffill=True): + # TODO: implement in the bundle df = exchange.get_history_window( assets, end_dt, diff --git a/catalyst/exchange/exchange_algorithm.py b/catalyst/exchange/exchange_algorithm.py index bddbf2f0..c9db3a12 100644 --- a/catalyst/exchange/exchange_algorithm.py +++ b/catalyst/exchange/exchange_algorithm.py @@ -35,12 +35,15 @@ from catalyst.exchange.exchange_errors import ( ExchangePortfolioDataError, ExchangeTransactionError, OrphanOrderError) +from catalyst.exchange.exchange_execution import ExchangeStopLimitOrder, \ + ExchangeLimitOrder, ExchangeStopOrder from catalyst.exchange.exchange_utils import get_exchange_minute_writer_root, \ save_algo_object, get_algo_object, get_algo_folder, get_algo_df, \ save_algo_df from catalyst.exchange.live_graph_clock import LiveGraphClock from catalyst.exchange.simple_clock import SimpleClock from catalyst.exchange.stats_utils import get_pretty_stats +from catalyst.finance.execution import MarketOrder from catalyst.finance.performance.period import calc_period_stats from catalyst.gens.tradesimulation import AlgorithmSimulator from catalyst.utils.api_support import ( @@ -198,6 +201,43 @@ class ExchangeTradingAlgorithmBacktest(ExchangeTradingAlgorithmBase): ) log.info('initialized trading algorithm in backtest mode') + def _calculate_order(self, asset, amount, + limit_price=None, stop_price=None, style=None): + # Raises a ZiplineError if invalid parameters are detected. + self.validate_order_params(asset, + amount, + limit_price, + stop_price, + style) + + # Convert deprecated limit_price and stop_price parameters to use + # ExecutionStyle objects. + style = self.__convert_order_params_for_blotter(limit_price, + stop_price, + style) + return amount, style + + @staticmethod + def __convert_order_params_for_blotter(limit_price, stop_price, style): + """ + Helper method for converting deprecated limit_price and stop_price + arguments into ExecutionStyle instances. + + This function assumes that either style == None or (limit_price, + stop_price) == (None, None). + """ + if style: + assert (limit_price, stop_price) == (None, None) + return style + if limit_price and stop_price: + return ExchangeStopLimitOrder(limit_price, stop_price) + if limit_price: + return ExchangeLimitOrder(limit_price) + if stop_price: + return ExchangeStopOrder(stop_price) + else: + return MarketOrder() + class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): def __init__(self, *args, **kwargs): diff --git a/catalyst/exchange/exchange_blotter.py b/catalyst/exchange/exchange_blotter.py index dd771cca..58994030 100644 --- a/catalyst/exchange/exchange_blotter.py +++ b/catalyst/exchange/exchange_blotter.py @@ -1,8 +1,141 @@ +from logbook import Logger + from catalyst.finance.blotter import Blotter -from catalyst.finance.commission import PerShare -from catalyst.finance.slippage import VolumeShareSlippage +from catalyst.finance.commission import PerShare, CommissionModel +from catalyst.finance.slippage import VolumeShareSlippage, SlippageModel, \ + LiquidityExceeded from catalyst.assets._assets import TradingPair +# It seems like we need to accept greate slippage risk in cryptos +# Orders won't often close at Equity levels. +# TODO: consider adjusting dynamically based on trading pair +from catalyst.finance.transaction import Transaction + +log = Logger('exchange_blotter') + +DEFAULT_SLIPPAGE_SPREAD = 0.02 +DEFAULT_MAKER_FEE = 0.001 +DEFAULT_TAKER_FEE = 0.002 + + +class TradingPairFeeSchedule(CommissionModel): + """ + Calculates a commission for a transaction based on a per percentage fee. + + Parameters + ---------- + fee : float, optional + The percentage fee. + """ + + def __init__(self, + maker_fee=DEFAULT_MAKER_FEE, + taker_fee=DEFAULT_TAKER_FEE): + self.maker_fee = maker_fee + self.taker_fee = taker_fee + + def __repr__(self): + return ( + '{class_name}(maker_fee={maker_fee}, ' + 'taker_fee={taker_fee})' + .format( + class_name=self.__class__.__name__, + maker_fee=self.maker_fee, + taker_fee=self.taker_fee, + ) + ) + + def calculate(self, order, transaction): + """ + Calculate the final fee based on the order parameters. + + :param order: + :param transaction: + + :return float: + The total commission. + """ + cost = abs(transaction.amount) * transaction.price + + # Assuming just the taker fee for now + fee = cost * self.taker_fee + return fee + + +class TradingPairFixedSlippage(SlippageModel): + """ + Model slippage as a fixed spread. + + Parameters + ---------- + spread : float, optional + spread / 2 will be added to buys and subtracted from sells. + """ + + def __init__(self, spread=DEFAULT_SLIPPAGE_SPREAD): + super(TradingPairFixedSlippage, self).__init__() + self.spread = spread + + def __repr__(self): + return '{class_name}(spread={spread})'.format( + class_name=self.__class__.__name__, spread=self.spread, + ) + + def simulate(self, data, asset, orders_for_asset): + self._volume_for_bar = 0 + volume = data.current(asset, "volume") + + if volume == 0: + return + + # can use the close price, since we verified there's volume in this + # bar. + price = data.current(asset, "close") + dt = data.current_dt + + for order in orders_for_asset: + if order.open_amount == 0: + continue + + order.check_triggers(price, dt) + if not order.triggered: + continue + + transaction = None + try: + execution_price, execution_volume = \ + self.process_order(data, order) + + if execution_price is not None: + transaction = Transaction( + asset=order.asset, + amount=abs(execution_volume), + dt=data.current_dt, + price=execution_price, + order_id=order.id + ) + + except LiquidityExceeded: + break + + if transaction: + self._volume_for_bar += abs(transaction.amount) + yield order, transaction + + def process_order(self, data, order): + price = data.current(order.asset, 'close') + + if order.amount > 0: + # Buy order + adj_price = price * (1 + self.spread) + else: + # Sell order + adj_price = price & (1 - self.spread) + + log.debug('added slippage to price: {} => {}'.format(price, adj_price)) + + return (adj_price, order.amount) + class ExchangeBlotter(Blotter): def __init__(self, *args, **kwargs): @@ -12,8 +145,8 @@ class ExchangeBlotter(Blotter): # We may be able to define more sophisticated models based on the fee # structure of each exchange. self.slippage_models = { - TradingPair: VolumeShareSlippage() + TradingPair: TradingPairFixedSlippage() } self.commission_models = { - TradingPair: PerShare() + TradingPair: TradingPairFeeSchedule() } diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index 056a6421..a02363fd 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -64,6 +64,7 @@ def process_bar_data(exchange, assets, writer, data_frequency, show_progress, start, end): open_calendar = get_calendar('OPEN') + writer.default_ohlc_ratio = 1000000 writer.calendar = open_calendar writer.minutes_per_day = 1440 writer.write_metadata = True diff --git a/tests/exchange/test_bundle.py b/tests/exchange/test_bundle.py index 7cf236a2..92a1ff30 100644 --- a/tests/exchange/test_bundle.py +++ b/tests/exchange/test_bundle.py @@ -16,12 +16,12 @@ class ExchangeBundleTestCase: def test_ingest(self): exchange_name = 'bitfinex' - start = pd.Timestamp.utcnow() - timedelta(days=365) + start = pd.to_datetime('2017-09-01', utc=True) end = pd.Timestamp.utcnow() open_calendar = get_calendar('OPEN') root = data_root(os.environ) - output_dir = '{root}/exchange_{exchange}/test'.format( + output_dir = '{root}/exchange_{exchange}/2017-09-21T05;34;37.274482'.format( root=root, exchange=exchange_name ) @@ -40,7 +40,8 @@ class ExchangeBundleTestCase: minutes_per_day=1440, start_session=start_session, end_session=end, - write_metadata=True + write_metadata=True, + default_ohlc_ratio=1000000 ) ingest = exchange_bundle(