From 4e2d092123cf983e1a83b324196744b0f38ef184 Mon Sep 17 00:00:00 2001 From: fredfortier Date: Wed, 20 Sep 2017 18:00:08 -0400 Subject: [PATCH] Trying to fix an issue with periodical bars --- catalyst/exchange/bitfinex/bitfinex.py | 20 +- catalyst/exchange/data_portal_exchange.py | 27 ++- catalyst/exchange/exchange_algorithm.py | 152 +++++++------- catalyst/exchange/exchange_bundle.py | 101 +++++---- catalyst/utils/run_algo.py | 236 ++++++++-------------- tests/exchange/test_data_portal.py | 4 +- 6 files changed, 263 insertions(+), 277 deletions(-) diff --git a/catalyst/exchange/bitfinex/bitfinex.py b/catalyst/exchange/bitfinex/bitfinex.py index cc34d436..20c3f312 100644 --- a/catalyst/exchange/bitfinex/bitfinex.py +++ b/catalyst/exchange/bitfinex/bitfinex.py @@ -232,7 +232,8 @@ class Bitfinex(Exchange): # TODO: fetch account data and keep in cache return None - def get_candles(self, data_frequency, assets, bar_count=None, end_dt=None): + def get_candles(self, data_frequency, assets, bar_count=None, + start_dt=None, end_dt=None): """ Retrieve OHLVC candles from Bitfinex @@ -289,11 +290,18 @@ class Bitfinex(Exchange): is_list = True url += '/hist?limit={}'.format(int(bar_count)) - if end_dt is not None: + def get_ms(date): epoch = datetime.datetime.utcfromtimestamp(0) epoch = epoch.replace(tzinfo=pytz.UTC) - end_ms = (end_dt - epoch).total_seconds() * 1000.0 + return (date - epoch).total_seconds() * 1000.0 + + if start_dt is not None: + start_ms = get_ms(start_dt) + url += '&start={0:f}'.format(start_ms) + + if end_dt is not None: + end_ms = get_ms(end_dt) url += '&end={0:f}'.format(end_ms) else: @@ -315,6 +323,9 @@ class Bitfinex(Exchange): candles = response.json() def ohlc_from_candle(candle): + last_traded = pd.Timestamp.utcfromtimestamp( + candle[0] / 1000.0) + last_traded = last_traded.replace(tzinfo=pytz.UTC) ohlc = dict( open=np.float64(candle[1]), high=np.float64(candle[3]), @@ -322,8 +333,7 @@ class Bitfinex(Exchange): close=np.float64(candle[2]), volume=np.float64(candle[5]), price=np.float64(candle[2]), - last_traded=pd.Timestamp.utcfromtimestamp( - candle[0] / 1000.0) + last_traded=last_traded ) return ohlc diff --git a/catalyst/exchange/data_portal_exchange.py b/catalyst/exchange/data_portal_exchange.py index c9dc5bd0..3968370d 100644 --- a/catalyst/exchange/data_portal_exchange.py +++ b/catalyst/exchange/data_portal_exchange.py @@ -331,20 +331,29 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase): else: raise ValueError('Unsupported frequency') - values = [] - for asset in assets: + if isinstance(assets, TradingPair): try: value = reader.get_value( - sid=asset.sid, + sid=assets.sid, dt=dt, field=field ) - values.append(value) + return value except Exception as e: log.warn('minute data not found: {}'.format(e)) - values.append(None) - - if len(assets) == 1: - return values[0] + return None else: - return values + values = [] + for asset in assets: + try: + value = reader.get_value( + sid=asset.sid, + dt=dt, + field=field + ) + values.append(value) + except Exception as e: + log.warn('minute data not found: {}'.format(e)) + values.append(None) + + return values diff --git a/catalyst/exchange/exchange_algorithm.py b/catalyst/exchange/exchange_algorithm.py index 030a3341..d20bed49 100644 --- a/catalyst/exchange/exchange_algorithm.py +++ b/catalyst/exchange/exchange_algorithm.py @@ -49,7 +49,7 @@ from catalyst.utils.input_validation import error_keywords, ensure_upper_case, \ expect_types from catalyst.utils.preprocess import preprocess -log = logbook.Logger("ExchangeTradingAlgorithm") +log = logbook.Logger('exchange_algorithm') class ExchangeAlgorithmExecutor(AlgorithmSimulator): @@ -59,6 +59,8 @@ class ExchangeAlgorithmExecutor(AlgorithmSimulator): class ExchangeTradingAlgorithmBase(TradingAlgorithm): def __init__(self, *args, **kwargs): + self.exchanges = kwargs.pop('exchanges', None) + super(ExchangeTradingAlgorithmBase, self).__init__(*args, **kwargs) @api_method @@ -106,10 +108,83 @@ class ExchangeTradingAlgorithmBase(TradingAlgorithm): as_of_date=_lookup_date ) + def prepare_period_stats(self, start_dt, end_dt): + """ + Creates a dictionary representing the state of the tracker. -class ExchangeTradingAlgorithm(ExchangeTradingAlgorithmBase): + + I rewrote this in an attempt to better control the stats. + I don't want things to happen magically through complex logic + pertaining to backtesting. + + """ + tracker = self.perf_tracker + period = tracker.todays_performance + + pos_stats = period.position_tracker.stats() + period_stats = calc_period_stats(pos_stats, period.ending_cash) + + stats = dict( + period_start=tracker.period_start, + period_end=tracker.period_end, + capital_base=tracker.capital_base, + progress=tracker.progress, + ending_value=period.ending_value, + ending_exposure=period.ending_exposure, + capital_used=period.cash_flow, + starting_value=period.starting_value, + starting_exposure=period.starting_exposure, + starting_cash=period.starting_cash, + ending_cash=period.ending_cash, + portfolio_value=period.ending_cash + period.ending_value, + pnl=period.pnl, + returns=period.returns, + period_open=period.period_open, + period_close=period.period_close, + gross_leverage=period_stats.gross_leverage, + net_leverage=period_stats.net_leverage, + short_exposure=pos_stats.short_exposure, + long_exposure=pos_stats.long_exposure, + short_value=pos_stats.short_value, + long_value=pos_stats.long_value, + longs_count=pos_stats.longs_count, + shorts_count=pos_stats.shorts_count, + ) + + # Merging cumulative risk + stats.update(tracker.cumulative_risk_metrics.to_dict()) + + # Merging latest recorded variables + stats.update(self.recorded_vars) + + stats['positions'] = period.position_tracker.get_positions_list() + + # we want the key to be absent, not just empty + # Only include transactions for given dt + stats['transactions'] = dict() + for date in period.processed_transactions: + if start_dt <= date < end_dt: + stats['transactions'][date] = \ + period.processed_transactions[date] + + stats['orders'] = dict() + for date in period.orders_by_modified: + if start_dt <= date < end_dt: + stats['orders'][date] = \ + period.orders_by_modified[date] + + return stats + + +class ExchangeTradingAlgorithmBacktest(ExchangeTradingAlgorithmBase): + def __init__(self, *args, **kwargs): + super(ExchangeTradingAlgorithmBacktest, self).__init__(*args, **kwargs) + + log.info('initialized trading algorithm in backtest mode') + + +class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): def __init__(self, *args, **kwargs): - self.exchanges = kwargs.pop('exchanges', None) self.algo_namespace = kwargs.pop('algo_namespace', None) self.live_graph = kwargs.pop('live_graph', None) @@ -134,13 +209,13 @@ class ExchangeTradingAlgorithm(ExchangeTradingAlgorithmBase): self.stats_minutes = 5 - super(ExchangeTradingAlgorithm, self).__init__(*args, **kwargs) + super(ExchangeTradingAlgorithmLive, self).__init__(*args, **kwargs) # TODO: fix precision before re-enabling # self._create_minute_writer() signal.signal(signal.SIGINT, self.signal_handler) - log.info('exchange trading algorithm successfully initialized') + log.info('initialized trading algorithm in live mode') def _create_minute_writer(self): root = get_exchange_minute_writer_root(self.exchange.name) @@ -360,73 +435,6 @@ class ExchangeTradingAlgorithm(ExchangeTradingAlgorithmBase): save_algo_df(self.algo_namespace, 'exposure_stats', self.exposure_stats) - def prepare_period_stats(self, start_dt, end_dt): - """ - Creates a dictionary representing the state of the tracker. - - - I rewrote this in an attempt to better control the stats. - I don't want things to happen magically through complex logic - pertaining to backtesting. - - """ - tracker = self.perf_tracker - period = tracker.todays_performance - - pos_stats = period.position_tracker.stats() - period_stats = calc_period_stats(pos_stats, period.ending_cash) - - stats = dict( - period_start=tracker.period_start, - period_end=tracker.period_end, - capital_base=tracker.capital_base, - progress=tracker.progress, - ending_value=period.ending_value, - ending_exposure=period.ending_exposure, - capital_used=period.cash_flow, - starting_value=period.starting_value, - starting_exposure=period.starting_exposure, - starting_cash=period.starting_cash, - ending_cash=period.ending_cash, - portfolio_value=period.ending_cash + period.ending_value, - pnl=period.pnl, - returns=period.returns, - period_open=period.period_open, - period_close=period.period_close, - gross_leverage=period_stats.gross_leverage, - net_leverage=period_stats.net_leverage, - short_exposure=pos_stats.short_exposure, - long_exposure=pos_stats.long_exposure, - short_value=pos_stats.short_value, - long_value=pos_stats.long_value, - longs_count=pos_stats.longs_count, - shorts_count=pos_stats.shorts_count, - ) - - # Merging cumulative risk - stats.update(tracker.cumulative_risk_metrics.to_dict()) - - # Merging latest recorded variables - stats.update(self.recorded_vars) - - stats['positions'] = period.position_tracker.get_positions_list() - - # we want the key to be absent, not just empty - # Only include transactions for given dt - stats['transactions'] = dict() - for date in period.processed_transactions: - if start_dt <= date < end_dt: - stats['transactions'][date] = \ - period.processed_transactions[date] - - stats['orders'] = dict() - for date in period.orders_by_modified: - if start_dt <= date < end_dt: - stats['orders'][date] = \ - period.orders_by_modified[date] - - return stats - def handle_data(self, data): if not self.is_running: return diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index 2b4c6435..147c4d9d 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -1,17 +1,12 @@ from datetime import timedelta -from time import sleep - -import os import pandas as pd -from catalyst.data.bundles.base_pricing import BaseCryptoPricingBundle - -from catalyst import get_calendar +import numpy as np from logbook import Logger, INFO +from catalyst import get_calendar from catalyst.data.five_minute_bars import BcolzFiveMinuteOverlappingData -from catalyst.data.minute_bars import BcolzMinuteOverlappingData, \ - BcolzMinuteBarReader +from catalyst.data.minute_bars import BcolzMinuteOverlappingData from catalyst.exchange.bitfinex.bitfinex import Bitfinex from catalyst.exchange.bittrex.bittrex import Bittrex from catalyst.exchange.exchange_errors import ExchangeNotFoundError @@ -27,10 +22,12 @@ log = Logger('exchange_bundle') def fetch_candles_chunk(exchange, assets, data_frequency, end_dt, bar_count): + calc_start_dt = end_dt - timedelta(minutes=bar_count) candles = exchange.get_candles( data_frequency=data_frequency, assets=assets, bar_count=bar_count, + start_dt=calc_start_dt, end_dt=end_dt ) @@ -39,10 +36,34 @@ def fetch_candles_chunk(exchange, assets, data_frequency, end_dt, bar_count): for asset in assets: asset_candles = candles[asset] + candle_start_dt = None + candle_end_dt = None + for candle in asset_candles: + last_traded = candle['last_traded'] + + if candle_start_dt is None or candle_start_dt > last_traded: + candle_start_dt = last_traded + + if candle_end_dt is None or candle_end_dt < last_traded: + candle_end_dt = last_traded + + if candle_end_dt < end_dt: + asset_candles.append( + dict( + open=None, + high=None, + close=None, + low=None, + volume=None, + last_traded=end_dt + ) + ) + asset_df = pd.DataFrame(asset_candles) if not asset_df.empty: asset_df.set_index('last_traded', inplace=True, drop=True) asset_df.sort_index(inplace=True) + asset_df = asset_df.resample('1T').ffill() series[asset] = asset_df @@ -77,12 +98,13 @@ def process_bar_data(exchange, assets, writer, data_frequency, bar_count = exchange.num_candles_limit chunks = [] - last_chunk_date = end + last_chunk_date = end.floor('1 min') while last_chunk_date > start + timedelta(minutes=bar_count): # TODO: account for the partial last bar chunk = dict(end=last_chunk_date, bar_count=bar_count) chunks.append(chunk) + # TODO: base on frequency last_chunk_date = \ last_chunk_date - timedelta(minutes=(bar_count + 1)) @@ -119,20 +141,29 @@ def process_bar_data(exchange, assets, writer, data_frequency, ) continue + num_candles = 0 data = [] for asset in assets_candles_dict: df = assets_candles_dict[asset] sid = asset.sid + + num_candles += len(df.values) data.append((sid, df)) try: - log.debug( - 'writing chunk {start} to {end}'.format( - start=chunk['end'] - timedelta( - minutes=chunk['bar_count']), + log.info( + 'writing {num_candles} candles from {start} to {end}'.format( + num_candles=num_candles, + start=chunk['end'] - \ + timedelta(minutes=chunk['bar_count']), end=chunk['end'] ) ) + + for pair in data: + log.info('data for sid {}\n{}\n{}'.format( + pair[0], pair[1].head(2), pair[1].tail(2))) + writer.write( data=data, show_progress=False, @@ -258,27 +289,27 @@ def exchange_bundle(exchange_name, symbols=None, start=None, end=None, if start >= end: raise ValueError('start date cannot be after end date') - if daily_bar_writer is not None: - process_bar_data( - exchange=exchange, - assets=assets, - writer=daily_bar_writer, - data_frequency='daily', - show_progress=show_progress, - start=start, - end=end - ) - - if five_minute_bar_writer is not None: - process_bar_data( - exchange=exchange, - assets=assets, - writer=five_minute_bar_writer, - data_frequency='5-minute', - show_progress=show_progress, - start=start, - end=end - ) + # if daily_bar_writer is not None: + # process_bar_data( + # exchange=exchange, + # assets=assets, + # writer=daily_bar_writer, + # data_frequency='daily', + # show_progress=show_progress, + # start=start, + # end=end + # ) + # + # if five_minute_bar_writer is not None: + # process_bar_data( + # exchange=exchange, + # assets=assets, + # writer=five_minute_bar_writer, + # data_frequency='5-minute', + # show_progress=show_progress, + # start=start, + # end=end + # ) if minute_bar_writer is not None: process_bar_data( @@ -292,5 +323,3 @@ def exchange_bundle(exchange_name, symbols=None, start=None, end=None, ) return ingest - - diff --git a/catalyst/utils/run_algo.py b/catalyst/utils/run_algo.py index 6877bc54..401bd2e2 100644 --- a/catalyst/utils/run_algo.py +++ b/catalyst/utils/run_algo.py @@ -1,14 +1,12 @@ import os -import re -from runpy import run_path import sys import warnings -from time import sleep from datetime import timedelta - -import pandas as pd +from runpy import run_path +from time import sleep import click +import pandas as pd from catalyst.exchange.bittrex.bittrex import Bittrex @@ -23,22 +21,15 @@ except: from toolz import valfilter, concatv from functools import partial -from catalyst.algorithm import TradingAlgorithm -from catalyst.data.bundles.core import load -from catalyst.data.data_portal import DataPortal -from catalyst.data.loader import load_crypto_market_data from catalyst.finance.trading import TradingEnvironment -from catalyst.pipeline.data import USEquityPricing, CryptoPricing -from catalyst.pipeline.loaders import ( - USEquityPricingLoader, - CryptoPricingLoader, -) from catalyst.utils.calendars import get_calendar from catalyst.utils.factory import create_simulation_parameters import catalyst.utils.paths as pth -from catalyst.exchange.exchange_algorithm import ExchangeTradingAlgorithm -from catalyst.exchange.data_portal_exchange import DataPortalExchangeLive +from catalyst.exchange.exchange_algorithm import ExchangeTradingAlgorithmLive, \ + ExchangeTradingAlgorithmBacktest +from catalyst.exchange.data_portal_exchange import DataPortalExchangeLive, \ + DataPortalExchangeBacktest from catalyst.exchange.bitfinex.bitfinex import Bitfinex from catalyst.exchange.asset_finder_exchange import AssetFinderExchange from catalyst.exchange.exchange_portfolio import ExchangePortfolio @@ -148,49 +139,46 @@ def _run(handle_data, mode = 'live' if live else 'backtest' log.info('running algo in {mode} mode'.format(mode=mode)) - if live and exchange is not None: - exchange_name = exchange + exchange_name = exchange + if exchange_name is None: + raise ValueError('Please specify at least one exchange.') - start = pd.Timestamp.utcnow() - # TODO: fix the end data. - end = start + timedelta(hours=8760) + exchange_list = [x.strip().lower() for x in exchange.split(',')] - exchange_list = [x.strip().lower() for x in exchange.split(',')] + exchanges = dict() + for exchange_name in exchange_list: - exchanges = dict() - for exchange_name in exchange_list: + # Looking for the portfolio from the cache first + portfolio = get_algo_object( + algo_name=algo_namespace, + key='portfolio_{}'.format(exchange_name), + environ=environ + ) - # Looking for the portfolio from the cache first - portfolio = get_algo_object( - algo_name=algo_namespace, - key='portfolio_{}'.format(exchange_name), - environ=environ + if portfolio is None: + portfolio = ExchangePortfolio( + start_date=pd.Timestamp.utcnow() ) - if portfolio is None: - portfolio = ExchangePortfolio( - start_date=pd.Timestamp.utcnow() - ) - - # This corresponds to the json file containing api token info - exchange_auth = get_exchange_auth(exchange_name) - if exchange_name == 'bitfinex': - exchanges[exchange_name] = Bitfinex( - key=exchange_auth['key'], - secret=exchange_auth['secret'], - base_currency=base_currency, - portfolio=portfolio - ) - elif exchange_name == 'bittrex': - exchanges[exchange_name] = Bittrex( - key=exchange_auth['key'], - secret=exchange_auth['secret'], - base_currency=base_currency, - portfolio=portfolio - ) - else: - raise ExchangeNotFoundError(exchange_name=exchange_name) + # This corresponds to the json file containing api token info + exchange_auth = get_exchange_auth(exchange_name) + if exchange_name == 'bitfinex': + exchanges[exchange_name] = Bitfinex( + key=exchange_auth['key'], + secret=exchange_auth['secret'], + base_currency=base_currency, + portfolio=portfolio + ) + elif exchange_name == 'bittrex': + exchanges[exchange_name] = Bittrex( + key=exchange_auth['key'], + secret=exchange_auth['secret'], + base_currency=base_currency, + portfolio=portfolio + ) + else: + raise ExchangeNotFoundError(exchange_name=exchange_name) open_calendar = get_calendar('OPEN') sim_params = create_simulation_parameters( @@ -201,13 +189,19 @@ def _run(handle_data, emission_rate=data_frequency, ) - if live and exchange is not None: - env = TradingEnvironment( - environ=environ, - exchange_tz='UTC', - asset_db_path=None - ) - env.asset_finder = AssetFinderExchange() + env = TradingEnvironment( + environ=environ, + exchange_tz='UTC', + asset_db_path=None # We don't need an asset db, we have exchanges + ) + env.asset_finder = AssetFinderExchange() + choose_loader = None # TODO: use the DataPortal for in the algorithm class for this + + if live: + start = pd.Timestamp.utcnow() + + # TODO: fix the end data. + end = start + timedelta(hours=8760) data = DataPortalExchangeLive( exchanges=exchanges, @@ -215,7 +209,6 @@ def _run(handle_data, trading_calendar=open_calendar, first_trading_day=pd.to_datetime('today', utc=True) ) - choose_loader = None def fetch_capital_base(exchange, attempt_index=0): """ @@ -264,102 +257,34 @@ def _run(handle_data, data_frequency='minute' ) - elif bundle is not None: - bundles = bundle.split(',') - - def get_trading_env_and_data(bundles): - env = data = None - - b = 'poloniex' - if len(bundles) == 0: - return env, data - elif len(bundles) == 1: - b = bundles[0] - - bundle_data = load( - b, - environ, - bundle_timestamp, - ) - - prefix, connstr = re.split( - r'sqlite:///', - str(bundle_data.asset_finder.engine.url), - maxsplit=1, - ) - if prefix: - raise ValueError( - "invalid url %r, must begin with 'sqlite:///'" % - str(bundle_data.asset_finder.engine.url), - ) - - env = TradingEnvironment( - load=partial(load_crypto_market_data, bundle=b, - bundle_data=bundle_data, environ=environ), - bm_symbol='USDT_BTC', - trading_calendar=open_calendar, - asset_db_path=connstr, - environ=environ, - ) - - first_trading_day = bundle_data.minute_bar_reader.first_trading_day - - data = DataPortal( - env.asset_finder, - open_calendar, - first_trading_day=first_trading_day, - minute_reader=bundle_data.minute_bar_reader, - five_minute_reader=bundle_data.five_minute_bar_reader, - daily_reader=bundle_data.daily_bar_reader, - adjustment_reader=bundle_data.adjustment_reader, - ) - - return env, data - - def get_loader_for_bundle(b): - bundle_data = load( - b, - environ, - bundle_timestamp, - ) - - if b == 'poloniex': - return CryptoPricingLoader( - bundle_data, - data_frequency, - CryptoPricing, - ) - elif b == 'quandl': - return USEquityPricingLoader( - bundle_data, - data_frequency, - USEquityPricing, - ) - raise ValueError( - "No PipelineLoader registered for bundle %s." % b - ) - - loaders = [get_loader_for_bundle(b) for b in bundles] - env, data = get_trading_env_and_data(bundles) - - def choose_loader(column): - for loader in loaders: - if column in loader.columns: - return loader - raise ValueError( - "No PipelineLoader registered for column %s." % column - ) - + algorithm_class = partial( + ExchangeTradingAlgorithmLive, + exchanges=exchanges, + algo_namespace=algo_namespace, + live_graph=live_graph + ) else: - env = TradingEnvironment(environ=environ) - choose_loader = None + # Removed the existing Poloniex fork to keep things simple + # We can add back the complexity if required. - TradingAlgorithmClass = ( - partial(ExchangeTradingAlgorithm, exchanges=exchanges, - algo_namespace=algo_namespace, live_graph=live_graph) - if live and exchanges else TradingAlgorithm) # TODO: backtest trading algo class + # I don't think that we should have arbitrary price data bundles + # Instead, we should center this data around exchanges. + # We still need to support bundles for other misc data, but we + # can handle this later. - perf = TradingAlgorithmClass( + data = DataPortalExchangeBacktest( + exchanges=exchanges, + asset_finder=env.asset_finder, + trading_calendar=open_calendar, + first_trading_day=start, + ) + + algorithm_class = partial( + ExchangeTradingAlgorithmBacktest, + exchanges=exchanges + ) + + perf = algorithm_class( namespace=namespace, env=env, get_pipeline_loader=choose_loader, @@ -530,6 +455,11 @@ def run_algorithm(initialize, """ load_extensions(default_extension, extensions, strict_extensions, environ) + # I'm not sure that we need this since the modified DataPortal + # does not require extensions to be explicitly loaded. + + # This will be useful for arbitrary non-pricing bundles but we may + # need to modify the logic. if not live: non_none_data = valfilter(bool, { 'data': data is not None, diff --git a/tests/exchange/test_data_portal.py b/tests/exchange/test_data_portal.py index 0ea2b8e9..87357c2b 100644 --- a/tests/exchange/test_data_portal.py +++ b/tests/exchange/test_data_portal.py @@ -50,7 +50,7 @@ class ExchangeDataPortalTestCase: exchanges=dict(bitfinex=self.bitfinex), asset_finder=asset_finder, trading_calendar=open_calendar, - first_trading_day=pd.to_datetime('today', utc=True) + first_trading_day=pd.to_datetime('2017-09-10', utc=True) ) def test_get_history_window_live(self): @@ -90,7 +90,7 @@ class ExchangeDataPortalTestCase: asset_finder.lookup_symbol('neo_btc', self.bitfinex), ] - date = pd.Timestamp.utcnow() - timedelta(hours=8) + date = pd.to_datetime('2017-09-10 9:00', utc=True) value = self.data_portal_backtest.get_spot_value( assets, 'close', date, 'minute') pass