From fdc5a300602be66424ddc73b35b6c744be3fc80b Mon Sep 17 00:00:00 2001 From: fredfortier Date: Thu, 26 Oct 2017 02:33:17 -0400 Subject: [PATCH] Added data validation unit tests and minor fixes to the get_candles method of Poloniex. --- catalyst/exchange/bitfinex/bitfinex.py | 12 ++- catalyst/exchange/bittrex/bittrex.py | 34 ++++-- catalyst/exchange/exchange.py | 12 ++- catalyst/exchange/exchange_bcolz.py | 5 +- catalyst/exchange/exchange_bundle.py | 137 +++++++++++++++---------- catalyst/exchange/exchange_utils.py | 7 ++ catalyst/exchange/poloniex/poloniex.py | 28 +++-- tests/exchange/test_bittrex.py | 16 +-- tests/exchange/test_bundle.py | 72 +++++++++++-- 9 files changed, 232 insertions(+), 91 deletions(-) diff --git a/catalyst/exchange/bitfinex/bitfinex.py b/catalyst/exchange/bitfinex/bitfinex.py index 47e64cab..ea243a1f 100644 --- a/catalyst/exchange/bitfinex/bitfinex.py +++ b/catalyst/exchange/bitfinex/bitfinex.py @@ -23,7 +23,7 @@ from catalyst.exchange.exchange_errors import ( from catalyst.exchange.exchange_execution import ExchangeLimitOrder, \ ExchangeStopLimitOrder, ExchangeStopOrder from catalyst.exchange.exchange_utils import get_exchange_symbols_filename, \ - download_exchange_symbols + download_exchange_symbols, get_symbols_string from catalyst.finance.order import Order, ORDER_STATUS from catalyst.protocol import Account @@ -255,6 +255,16 @@ class Bitfinex(Exchange): '1m', '5m', '15m', '30m', '1h', '3h', '6h', '12h', '1D', '7D', '14D', '1M' """ + log.debug( + 'retrieving {bars} {freq} candles on {exchange} from ' + '{end_dt} for markets {symbols}, '.format( + bars=bar_count, + freq=data_frequency, + exchange=self.name, + end_dt=end_dt, + symbols=get_symbols_string(assets) + ) + ) freq_match = re.match(r'([0-9].*)(m|h|d)', data_frequency, re.M | re.I) if freq_match: diff --git a/catalyst/exchange/bittrex/bittrex.py b/catalyst/exchange/bittrex/bittrex.py index 1bca9422..1bbfc68b 100644 --- a/catalyst/exchange/bittrex/bittrex.py +++ b/catalyst/exchange/bittrex/bittrex.py @@ -1,6 +1,7 @@ import json import pandas as pd +import time from catalyst.assets._assets import TradingPair from logbook import Logger from six.moves import urllib @@ -13,10 +14,12 @@ from catalyst.exchange.exchange_errors import InvalidHistoryFrequencyError, \ ExchangeRequestError, InvalidOrderStyle, OrderNotFound, OrderCancelError, \ CreateOrderError from catalyst.exchange.exchange_utils import get_exchange_symbols_filename, \ - download_exchange_symbols + download_exchange_symbols, get_symbols_string from catalyst.finance.execution import LimitOrder, StopLimitOrder from catalyst.finance.order import Order, ORDER_STATUS +# TODO: consider using this: https://github.com/mondeja/bittrex_v2 + log = Logger('Bittrex', level=LOG_LEVEL) URL2 = 'https://bittrex.com/Api/v2.0' @@ -217,10 +220,27 @@ class Bittrex(Exchange): :param data_frequency: :param assets: :param bar_count: + :param start_dt + :param end_dt :return: """ - log.info('retrieving candles') + # TODO: this has no effect at the moment + if end_dt is None: + end_dt = pd.Timestamp.utcnow() + + log.debug( + 'retrieving {bars} {freq} candles on {exchange} from ' + '{end_dt} for markets {symbols}, '.format( + bars=bar_count, + freq=data_frequency, + exchange=self.name, + end_dt=end_dt, + symbols=get_symbols_string(assets) + ) + ) + + data_frequency = data_frequency.lower() if data_frequency == 'minute' or data_frequency == '1m': frequency = 'oneMin' elif data_frequency == '5m': @@ -229,7 +249,7 @@ class Bittrex(Exchange): frequency = 'thirtyMin' elif data_frequency == '1h': frequency = 'hour' - elif data_frequency == 'daily' or data_frequency == '1D': + elif data_frequency == 'daily' or data_frequency == '1d': frequency = 'day' else: raise InvalidHistoryFrequencyError( @@ -238,13 +258,14 @@ class Bittrex(Exchange): # Making sure that assets are iterable asset_list = [assets] if isinstance(assets, TradingPair) else assets - ohlc_map = dict() for asset in asset_list: + end = int(time.mktime(end_dt.timetuple())) url = '{url}/pub/market/GetTicks?marketName={symbol}' \ - '&tickInterval={frequency}&_=1499127220008'.format( + '&tickInterval={frequency}&_={end}'.format( url=URL2, symbol=self.get_symbol(asset), - frequency=frequency + frequency=frequency, + end=end ) try: @@ -272,6 +293,7 @@ class Bittrex(Exchange): return ohlc ordered_candles = list(reversed(candles)) + ohlc_map = dict() if bar_count is None: ohlc_map[asset] = ohlc_from_candle(ordered_candles[0]) else: diff --git a/catalyst/exchange/exchange.py b/catalyst/exchange/exchange.py index bb4018d5..31037882 100644 --- a/catalyst/exchange/exchange.py +++ b/catalyst/exchange/exchange.py @@ -373,7 +373,7 @@ class Exchange: return value def get_series_from_candles(self, candles, start_dt, end_dt, - field, previous_value=None): + data_frequency, field, previous_value=None): """ Get a series of field data for the specified candles. @@ -388,9 +388,12 @@ class Exchange: dates = [candle['last_traded'] for candle in candles] values = [candle[field] for candle in candles] - periods = pd.date_range(start_dt, end_dt) + periods = self.bundle.get_calendar_periods_range( + start_dt, end_dt, data_frequency + ) series = pd.Series(values, index=dates) + #TODO: ensure that this working as expected, if not use fillna series.reindex(periods, method='ffill', fill_value=previous_value) return series @@ -487,6 +490,7 @@ class Exchange: data_frequency=data_frequency, assets=asset, bar_count=trailing_bar_count, + start_dt=start_dt, end_dt=end_dt ) @@ -497,6 +501,7 @@ class Exchange: candles=candles, start_dt=trailing_dt, end_dt=end_dt, + data_frequency=data_frequency, field=field, previous_value=last_value ) @@ -784,13 +789,14 @@ class Exchange: pass @abc.abstractmethod - def get_orderbook(self, asset, order_type): + def get_orderbook(self, asset, order_type, limit): """ Retrieve the the orderbook for the given trading pair. :param asset: TradingPair :param order_type: str The type of orders: bid, ask or all + :param limit :return: """ diff --git a/catalyst/exchange/exchange_bcolz.py b/catalyst/exchange/exchange_bcolz.py index 209f2d91..326745ac 100644 --- a/catalyst/exchange/exchange_bcolz.py +++ b/catalyst/exchange/exchange_bcolz.py @@ -16,7 +16,7 @@ class BcolzExchangeBarWriter(BcolzMinuteBarWriter): end_session = end_session.floor('1d') minutes_per_day = 1440 if self._data_frequency == 'minute' else 1 - default_ohlc_ratio = kwargs.pop('default_ohlc_ratio', 1000000) + default_ohlc_ratio = kwargs.pop('default_ohlc_ratio', 100000000) calendar = get_calendar('OPEN') super(BcolzExchangeBarWriter, self) \ @@ -79,8 +79,9 @@ class BcolzExchangeBarReader(BcolzMinuteBarReader): if mask is None: mask = a != 0 + inverse_ratio = self._ohlc_ratio_inverse_for_sid(sid) out[:len(mask), i][mask] = ( - a[mask] * self._ohlc_ratio_inverse_for_sid(sid) + a[mask] * inverse_ratio ) if field in fields: diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index 5e015d37..3a738f9b 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -194,6 +194,71 @@ class ExchangeBundle: if data_frequency == 'minute' \ else self.calendar.sessions_in_range(start_dt, end_dt) + def ingest_df(self, ohlcv_df, data_frequency, asset, writer, + empty_rows_behavior='strip'): + """ + Ingest a DataFrame of OHLCV data for a given market. + + :param ohlcv_df: + :param data_frequency: + :param asset: + :param writer: + :param path: + :param empty_rows_behavior: + :return: + """ + if empty_rows_behavior is not 'ignore': + nan_rows = ohlcv_df[ohlcv_df.isnull().T.any().T].index + + if len(nan_rows) > 0: + dates = [] + previous_date = None + for row_date in nan_rows.values: + row_date = pd.to_datetime(row_date) + + if previous_date is None: + dates.append(row_date) + + else: + seq_date = previous_date + get_delta(1, data_frequency) + + if row_date > seq_date: + dates.append(previous_date) + dates.append(row_date) + + previous_date = row_date + + dates.append(pd.to_datetime(nan_rows.values[-1])) + + name = '{} from {} to {}'.format( + asset.symbol, ohlcv_df.index[0], ohlcv_df.index[-1] + ) + if empty_rows_behavior == 'warn': + log.warn( + '\n{name} with end minute {end_minute} has empty rows ' + 'in ranges: {dates}'.format( + name=name, + end_minute=asset.end_minute, + dates=dates + ) + ) + + elif empty_rows_behavior == 'raise': + raise EmptyValuesInBundleError( + name=name, + end_minute=asset.end_minute, + dates=dates + ) + else: + ohlcv_df.dropna(inplace=True) + + data = [] + if not ohlcv_df.empty: + ohlcv_df.sort_index(inplace=True) + data.append((asset.sid, ohlcv_df)) + + self._write(data, writer, data_frequency) + def ingest_ctable(self, asset, data_frequency, period, start_dt, end_dt, writer, empty_rows_behavior='strip', cleanup=False): """ @@ -242,62 +307,19 @@ class ExchangeBundle: periods = self.get_calendar_periods_range( start_dt, end_dt, data_frequency ) - df = get_df_from_arrays(arrays, periods) - - if empty_rows_behavior is not 'ignore': - nan_rows = df[df.isnull().T.any().T].index - - if len(nan_rows) > 0: - dates = [] - previous_date = None - for row_date in nan_rows.values: - row_date = pd.to_datetime(row_date) - - if previous_date is None: - dates.append(row_date) - - else: - seq_date = previous_date + get_delta(1, data_frequency) - - if row_date > seq_date: - dates.append(previous_date) - dates.append(row_date) - - previous_date = row_date - - dates.append(pd.to_datetime(nan_rows.values[-1])) - - name = path.split('/')[-1] - if empty_rows_behavior == 'warn': - log.warn( - '\n{name} with end minute {end_minute} has empty rows ' - 'in ranges: {dates}'.format( - name=name, - end_minute=asset.end_minute, - dates=dates - ) - ) - - elif empty_rows_behavior == 'raise': - raise EmptyValuesInBundleError( - name=name, - end_minute=asset.end_minute, - dates=dates - ) - else: - df.dropna(inplace=True) - - data = [] - if not df.empty: - df.sort_index(inplace=True) - data.append((asset.sid, df)) - - self._write(data, writer, data_frequency) + self.ingest_df( + ohlcv_df=df, + data_frequency=data_frequency, + asset=asset, + writer=writer, + empty_rows_behavior=empty_rows_behavior + ) if cleanup: - log.debug('removing bundle folder following ' - 'ingestion: {}'.format(path)) + log.debug( + 'removing bundle folder following ingestion: {}'.format(path) + ) shutil.rmtree(path) return path @@ -315,9 +337,12 @@ class ExchangeBundle: earliest_trade = None last_entry = None for asset in assets: - if (earliest_trade is None or earliest_trade > asset.start_date) \ - and asset.start_date >= self.calendar.first_session: - earliest_trade = asset.start_date + if earliest_trade is None or earliest_trade > asset.start_date: + if asset.start_date >= self.calendar.first_session: + earliest_trade = asset.start_date + + else: + earliest_trade = self.calendar.first_session end_asset = asset.end_minute if data_frequency == 'minute' else \ asset.end_daily diff --git a/catalyst/exchange/exchange_utils.py b/catalyst/exchange/exchange_utils.py index 2e3982f0..77e0fc56 100644 --- a/catalyst/exchange/exchange_utils.py +++ b/catalyst/exchange/exchange_utils.py @@ -1,6 +1,8 @@ import json import os import pickle + +from catalyst.assets._assets import TradingPair from six.moves.urllib import request from datetime import date, datetime @@ -57,6 +59,11 @@ def get_exchange_symbols(exchange_name, environ=None): ) +def get_symbols_string(assets): + array = [assets] if isinstance(assets, TradingPair) else assets + return ', '.join([asset.symbol for asset in array]) + + def get_exchange_auth(exchange_name, environ=None): exchange_folder = get_exchange_folder(exchange_name, environ) filename = os.path.join(exchange_folder, 'auth.json') diff --git a/catalyst/exchange/poloniex/poloniex.py b/catalyst/exchange/poloniex/poloniex.py index a1f80461..468463b0 100644 --- a/catalyst/exchange/poloniex/poloniex.py +++ b/catalyst/exchange/poloniex/poloniex.py @@ -22,7 +22,7 @@ from catalyst.exchange.exchange_errors import ( from catalyst.exchange.exchange_execution import ExchangeLimitOrder, \ ExchangeStopLimitOrder from catalyst.exchange.exchange_utils import get_exchange_symbols_filename, \ - download_exchange_symbols + download_exchange_symbols, get_symbols_string from catalyst.exchange.poloniex.poloniex_api import Poloniex_api from catalyst.finance.order import Order, ORDER_STATUS from catalyst.finance.transaction import Transaction @@ -189,20 +189,32 @@ class Poloniex(Exchange): if end_dt is None: end_dt = pd.Timestamp.utcnow() - if ( - data_frequency == '5m' or data_frequency == 'minute'): # TODO: Polo does not have '1m' + log.debug( + 'retrieving {bars} {freq} candles on {exchange} from ' + '{end_dt} for markets {symbols}, '.format( + bars=bar_count, + freq=data_frequency, + exchange=self.name, + end_dt=end_dt, + symbols=get_symbols_string(assets) + ) + ) + + if data_frequency == '5m': frequency = 300 - elif (data_frequency == '15m'): + elif data_frequency == '15m': frequency = 900 - elif (data_frequency == '30m'): + elif data_frequency == '30m': frequency = 1800 - elif (data_frequency == '2h'): + elif data_frequency == '2h': frequency = 7200 - elif (data_frequency == '4h'): + elif data_frequency == '4h': frequency = 14400 - elif (data_frequency == '1D' or data_frequency == 'daily'): + elif data_frequency == '1D' or data_frequency == 'daily': frequency = 86400 else: + # Poloniex does not offer 1m data candles + # It is likely to error out there frequently raise InvalidHistoryFrequencyError( frequency=data_frequency ) diff --git a/tests/exchange/test_bittrex.py b/tests/exchange/test_bittrex.py index bbf52fa1..779c2a08 100644 --- a/tests/exchange/test_bittrex.py +++ b/tests/exchange/test_bittrex.py @@ -1,3 +1,4 @@ +import pandas as pd from catalyst.exchange.bittrex.bittrex import Bittrex from catalyst.finance.order import Order from base import BaseExchangeTestCase @@ -7,15 +8,15 @@ from catalyst.exchange.exchange_utils import get_exchange_auth log = Logger('test_bittrex') -class TestBittrexTestCase(BaseExchangeTestCase): +class TestBittrex(BaseExchangeTestCase): @classmethod def setup(self): - print ('creating bittrex object') auth = get_exchange_auth('bittrex') self.exchange = Bittrex( key=auth['key'], secret=auth['secret'], - base_currency='btc' + base_currency=None, + portfolio=None ) def test_order(self): @@ -52,15 +53,18 @@ class TestBittrexTestCase(BaseExchangeTestCase): log.info('retrieving candles') ohlcv_neo = self.exchange.get_candles( data_frequency='5m', - assets=self.exchange.get_asset('neo_btc') + assets=self.exchange.get_asset('neo_btc'), + bar_count=20, + end_dt=pd.to_datetime('2017-10-20', utc=True) ) ohlcv_neo_ubq = self.exchange.get_candles( - data_frequency='5m', + data_frequency='1d', assets=[ self.exchange.get_asset('neo_btc'), self.exchange.get_asset('ubq_btc') ], - bar_count=14 + bar_count=14, + end_dt=pd.to_datetime('2017-10-20', utc=True) ) pass diff --git a/tests/exchange/test_bundle.py b/tests/exchange/test_bundle.py index 9487e486..9c39e4e4 100644 --- a/tests/exchange/test_bundle.py +++ b/tests/exchange/test_bundle.py @@ -40,16 +40,16 @@ class TestExchangeBundle: def test_ingest_minute(self): data_frequency = 'minute' - exchange_name = 'poloniex' + exchange_name = 'bitfinex' exchange = get_exchange(exchange_name) exchange_bundle = ExchangeBundle(exchange) assets = [ - exchange.get_asset('burst_btc') + exchange.get_asset('iot_btc') ] # start = pd.to_datetime('2017-09-01', utc=True) - start = pd.to_datetime('2017-9-15', utc=True) + start = pd.to_datetime('2017-9-01', utc=True) end = pd.to_datetime('2017-9-30', utc=True) log.info('ingesting exchange bundle {}'.format(exchange_name)) @@ -318,15 +318,15 @@ class TestExchangeBundle: pass def test_validate_data(self): - exchange_name = 'poloniex' + exchange_name = 'bitfinex' data_frequency = 'minute' exchange = get_exchange(exchange_name) exchange_bundle = ExchangeBundle(exchange) - assets = [exchange.get_asset('neos_btc')] + assets = [exchange.get_asset('iot_btc')] - end_dt = pd.to_datetime('2017-10-20', utc=True) - bar_count = 100 + end_dt = pd.to_datetime('2017-9-2 1:00', utc=True) + bar_count = 60 bundle_series = exchange_bundle.get_history_window_series( assets=assets, @@ -349,12 +349,11 @@ class TestExchangeBundle: data=dict(bundle_price=bundle_series[asset]), index=bundle_series[asset].index ) - bundle_df = bundle_df.resample('5T').last() - exchange_series = exchange.get_series_from_candles( candles=candles[asset], start_dt=start_dt, end_dt=end_dt, + data_frequency=data_frequency, field='close' ) exchange_df = pd.DataFrame( @@ -372,3 +371,58 @@ class TestExchangeBundle: df = pd.concat(frames) print('\n' + df_to_string(df)) pass + + def test_ingest_candles(self): + exchange_name = 'bitfinex' + data_frequency = 'minute' + + exchange = get_exchange(exchange_name) + bundle = ExchangeBundle(exchange) + assets = [exchange.get_asset('iot_btc')] + + end_dt = pd.to_datetime('2017-10-20', utc=True) + bar_count = 100 + + start_dt = get_start_dt(end_dt, bar_count, data_frequency) + candles = exchange.get_candles( + assets=assets, + start_dt=start_dt, + end_dt=end_dt, + bar_count=bar_count, + data_frequency=data_frequency + ) + + writer = bundle.get_writer(start_dt, end_dt, data_frequency) + for asset in assets: + dates = [candle['last_traded'] for candle in candles[asset]] + + values = dict() + for field in ['open', 'high', 'low', 'close', 'volume']: + values[field] = [candle[field] for candle in candles[asset]] + + periods = bundle.get_calendar_periods_range( + start_dt, end_dt, data_frequency + ) + df = pd.DataFrame(values, index=dates) + df = df.loc[periods].fillna(method='ffill') + + # TODO: why do I get an extra bar? + bundle.ingest_df( + ohlcv_df=df, + data_frequency=data_frequency, + asset=asset, + writer=writer, + empty_rows_behavior='raise' + ) + + bundle_series = bundle.get_history_window_series( + assets=assets, + end_dt=end_dt, + bar_count=bar_count, + field='close', + data_frequency=data_frequency, + reset_reader=True + ) + df = pd.DataFrame(bundle_series) + print('\n' + df_to_string(df)) + pass