From 5e4ad9b33842467d8017487ce41fdca1cf2cc3e3 Mon Sep 17 00:00:00 2001 From: fredfortier Date: Thu, 2 Nov 2017 20:18:34 -0400 Subject: [PATCH] BUG: accounting for daily historical bars with minute freq algo --- catalyst/examples/simple_loop.py | 24 +++-- catalyst/exchange/exchange_bundle.py | 20 ++-- catalyst/exchange/exchange_data_portal.py | 7 +- tests/exchange/test_bundle.py | 15 ++- tests/exchange/test_server_bundle.py | 124 ++++++++++++++++++++++ 5 files changed, 162 insertions(+), 28 deletions(-) create mode 100644 tests/exchange/test_server_bundle.py diff --git a/catalyst/examples/simple_loop.py b/catalyst/examples/simple_loop.py index 9f02e5ac..1deedf92 100644 --- a/catalyst/examples/simple_loop.py +++ b/catalyst/examples/simple_loop.py @@ -16,26 +16,28 @@ def handle_data(context, data): price = data.current(context.asset, 'close') print('got price {price}'.format(price=price)) - # prices = data.history( - # context.asset, - # fields='price', - # bar_count=20, - # frequency='1T' - # ) - # rsi = talib.RSI(prices.values, timeperiod=14)[-1] - # print('got rsi: {}'.format(rsi)) - pass + try: + prices = data.history( + context.asset, + fields='price', + bar_count=16, + frequency='1D' + ) + rsi = talib.RSI(prices.values, timeperiod=14)[-1] + print('got rsi: {}'.format(rsi)) + except Exception as e: + print(e) run_algorithm( capital_base=250, start=pd.to_datetime('2017-1-1', utc=True), end=pd.to_datetime('2017-10-22', utc=True), - data_frequency='daily', + data_frequency='minute', initialize=initialize, handle_data=handle_data, analyze=None, - exchange_name='poloniex', + exchange_name='bitfinex', algo_namespace='simple_loop', base_currency='btc' ) diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index 38de8e9e..f5df0f8e 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -284,7 +284,7 @@ class ExchangeBundle: self._write(data, writer, data_frequency) - def ingest_ctable(self, asset, data_frequency, period, start_dt, end_dt, + def ingest_ctable(self, asset, data_frequency, period, writer, empty_rows_behavior='strip', cleanup=False): """ Merge a ctable bundle chunk into the main bundle for the exchange. @@ -315,6 +315,12 @@ class ExchangeBundle: if reader is None: raise TempBundleNotFoundError(path=path) + start_dt = reader.first_trading_day + end_dt = reader.last_available_dt + + if data_frequency == 'daily': + end_dt = end_dt - pd.Timedelta(hours=23, minutes=59) + arrays = None try: arrays = reader.load_raw_arrays( @@ -420,6 +426,12 @@ class ExchangeBundle: dict[TradingPair, list[dict(str, Object]]] """ + get_start_end = get_month_start_end \ + if data_frequency == 'minute' else get_year_start_end + + start_dt, _ = get_start_end(start_dt) + _, end_dt = get_start_end(end_dt) + reader = self.get_reader(data_frequency) chunks = dict() @@ -450,8 +462,6 @@ class ExchangeBundle: chunks[asset] = [] for index, dt in enumerate(dates): - get_start_end = get_month_start_end \ - if data_frequency == 'minute' else get_year_start_end period_start, period_end = get_start_end( dt=dt, @@ -543,8 +553,6 @@ class ExchangeBundle: asset=chunk['asset'], data_frequency=data_frequency, period=chunk['period'], - start_dt=chunk['period_start'], - end_dt=chunk['period_end'], writer=writer, empty_rows_behavior='strip', cleanup=True @@ -563,8 +571,6 @@ class ExchangeBundle: asset=chunk['asset'], data_frequency=data_frequency, period=chunk['period'], - start_dt=chunk['period_start'], - end_dt=chunk['period_end'], writer=writer, empty_rows_behavior='strip', cleanup=True diff --git a/catalyst/exchange/exchange_data_portal.py b/catalyst/exchange/exchange_data_portal.py index 6965fe24..a6cf4db7 100644 --- a/catalyst/exchange/exchange_data_portal.py +++ b/catalyst/exchange/exchange_data_portal.py @@ -328,17 +328,20 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase): """ bundle = self.exchange_bundles[exchange.name] # type: ExchangeBundle - freq, candle_size, unit, data_frequency = get_frequency( + freq, candle_size, unit, adj_data_frequency = get_frequency( frequency, data_frequency ) adj_bar_count = candle_size * bar_count + if data_frequency == 'minute' and adj_data_frequency == 'daily': + end_dt = end_dt.floor('1D') + series = bundle.get_history_window_series_and_load( assets=assets, end_dt=end_dt, bar_count=adj_bar_count, field=field, - data_frequency=data_frequency, + data_frequency=adj_data_frequency, algo_end_dt=self._last_available_session, ) diff --git a/tests/exchange/test_bundle.py b/tests/exchange/test_bundle.py index 2180dd03..c5e5feb9 100644 --- a/tests/exchange/test_bundle.py +++ b/tests/exchange/test_bundle.py @@ -431,7 +431,7 @@ class TestExchangeBundle: pass def bundle_to_csv(self): - exchange_name = 'poloniex' + exchange_name = 'bitfinex' data_frequency = 'daily' period = '2016' @@ -445,14 +445,13 @@ class TestExchangeBundle: data_frequency=data_frequency, period=period ) - - dt = pd.to_datetime(period, utc=True) - if data_frequency == 'minute': - start_dt, end_dt = get_month_start_end(dt) - else: - start_dt, end_dt = get_year_start_end(dt) - reader = bundle.get_reader(data_frequency, path=path) + start_dt = reader.first_trading_day + end_dt = reader.last_available_dt + + if data_frequency == 'daily': + end_dt = end_dt - pd.Timedelta(hours=23, minutes=59) + arrays = None try: arrays = reader.load_raw_arrays( diff --git a/tests/exchange/test_server_bundle.py b/tests/exchange/test_server_bundle.py new file mode 100644 index 00000000..50dd98d7 --- /dev/null +++ b/tests/exchange/test_server_bundle.py @@ -0,0 +1,124 @@ +import os +import tarfile +import importlib +import pandas as pd + +from catalyst import get_calendar + +from catalyst.exchange.exchange_bundle import ExchangeBundle +from catalyst.exchange.exchange_bcolz import BcolzExchangeBarReader +from catalyst.data.minute_bars import BcolzMinuteBarMetadata +from catalyst.exchange.bundle_utils import get_df_from_arrays, get_bcolz_chunk + +import matplotlib +import matplotlib.pyplot as plt +from matplotlib.finance import candlestick2_ohlc +from matplotlib.finance import volume_overlay +import matplotlib.ticker as ticker + +from catalyst.exchange.factory import get_exchange + +EXCHANGE_NAMES = ['bitfinex', 'bittrex', 'poloniex'] +exchanges = dict((e, getattr(importlib.import_module( + 'catalyst.exchange.{0}.{0}'.format(e)), e.capitalize())) + for e in EXCHANGE_NAMES) + + +class ValidateChunks(object): + def __init__(self): + self.columns = ['open', 'high', 'low', 'close', 'volume'] + + def chunk_to_df(self, exchange_name, symbol, data_frequency, period): + + exchange = get_exchange(exchange_name) + asset = exchange.get_asset(symbol) + + filename = get_bcolz_chunk( + exchange_name=exchange_name, + symbol=symbol, + data_frequency=data_frequency, + period=period + ) + + reader = BcolzExchangeBarReader(rootdir=filename, + data_frequency=data_frequency) + + # metadata = BcolzMinuteBarMetadata.read(filename) + + start = reader.first_trading_day + end = reader.last_available_dt + + if data_frequency == 'daily': + end = end - pd.Timedelta(hours=23, minutes=59) + + print start, end, data_frequency + + arrays = reader.load_raw_arrays(self.columns, start, end, + [asset.sid, ]) + + bundle = ExchangeBundle(exchange_name) + + periods = bundle.get_calendar_periods_range( + start, end, data_frequency + ) + + return get_df_from_arrays(arrays, periods) + + def plot_ohlcv(self, df): + + fig, ax = plt.subplots() + + # Plot the candlestick + candlestick2_ohlc(ax, df['open'], df['high'], df['low'], df['close'], + width=1, colorup='g', colordown='r', alpha=0.5) + + # shift y-limits of the candlestick plot so that there is space + # at the bottom for the volume bar chart + pad = 0.25 + yl = ax.get_ylim() + ax.set_ylim(yl[0] - (yl[1] - yl[0]) * pad, yl[1]) + + # Add a seconds axis for the volume overlay + ax2 = ax.twinx() + + ax2.set_position( + matplotlib.transforms.Bbox([[0.125, 0.1], [0.9, 0.26]])) + + # Plot the volume overlay + bc = volume_overlay(ax2, df['open'], df['close'], df['volume'], + colorup='g', alpha=0.5, width=1) + + ax.xaxis.set_major_locator(ticker.MaxNLocator(6)) + + def mydate(x, pos): + try: + return df.index[int(x)] + except IndexError: + return '' + + ax.xaxis.set_major_formatter(ticker.FuncFormatter(mydate)) + plt.margins(0) + plt.show() + + def plot(self, filename): + df = self.chunk_to_df(filename) + self.plot_ohlcv(df) + + def to_csv(self, filename): + df = self.chunk_to_df(filename) + df.to_csv(os.path.basename(filename).split('.')[0] + '.csv') + + +v = ValidateChunks() + +df = v.chunk_to_df( + exchange_name='bitfinex', + symbol='eth_btc', + data_frequency='daily', + period='2016' +) +print(df.tail()) +v.plot_ohlcv(df) +# v.plot( +# ex +# )