Added data validation unit tests and minor fixes to the get_candles method of Poloniex.

This commit is contained in:
fredfortier
2017-10-26 02:33:17 -04:00
parent bb1d96ed5d
commit fdc5a30060
9 changed files with 232 additions and 91 deletions
+11 -1
View File
@@ -23,7 +23,7 @@ from catalyst.exchange.exchange_errors import (
from catalyst.exchange.exchange_execution import ExchangeLimitOrder, \
ExchangeStopLimitOrder, ExchangeStopOrder
from catalyst.exchange.exchange_utils import get_exchange_symbols_filename, \
download_exchange_symbols
download_exchange_symbols, get_symbols_string
from catalyst.finance.order import Order, ORDER_STATUS
from catalyst.protocol import Account
@@ -255,6 +255,16 @@ class Bitfinex(Exchange):
'1m', '5m', '15m', '30m', '1h', '3h', '6h', '12h', '1D', '7D', '14D',
'1M'
"""
log.debug(
'retrieving {bars} {freq} candles on {exchange} from '
'{end_dt} for markets {symbols}, '.format(
bars=bar_count,
freq=data_frequency,
exchange=self.name,
end_dt=end_dt,
symbols=get_symbols_string(assets)
)
)
freq_match = re.match(r'([0-9].*)(m|h|d)', data_frequency, re.M | re.I)
if freq_match:
+28 -6
View File
@@ -1,6 +1,7 @@
import json
import pandas as pd
import time
from catalyst.assets._assets import TradingPair
from logbook import Logger
from six.moves import urllib
@@ -13,10 +14,12 @@ from catalyst.exchange.exchange_errors import InvalidHistoryFrequencyError, \
ExchangeRequestError, InvalidOrderStyle, OrderNotFound, OrderCancelError, \
CreateOrderError
from catalyst.exchange.exchange_utils import get_exchange_symbols_filename, \
download_exchange_symbols
download_exchange_symbols, get_symbols_string
from catalyst.finance.execution import LimitOrder, StopLimitOrder
from catalyst.finance.order import Order, ORDER_STATUS
# TODO: consider using this: https://github.com/mondeja/bittrex_v2
log = Logger('Bittrex', level=LOG_LEVEL)
URL2 = 'https://bittrex.com/Api/v2.0'
@@ -217,10 +220,27 @@ class Bittrex(Exchange):
:param data_frequency:
:param assets:
:param bar_count:
:param start_dt
:param end_dt
:return:
"""
log.info('retrieving candles')
# TODO: this has no effect at the moment
if end_dt is None:
end_dt = pd.Timestamp.utcnow()
log.debug(
'retrieving {bars} {freq} candles on {exchange} from '
'{end_dt} for markets {symbols}, '.format(
bars=bar_count,
freq=data_frequency,
exchange=self.name,
end_dt=end_dt,
symbols=get_symbols_string(assets)
)
)
data_frequency = data_frequency.lower()
if data_frequency == 'minute' or data_frequency == '1m':
frequency = 'oneMin'
elif data_frequency == '5m':
@@ -229,7 +249,7 @@ class Bittrex(Exchange):
frequency = 'thirtyMin'
elif data_frequency == '1h':
frequency = 'hour'
elif data_frequency == 'daily' or data_frequency == '1D':
elif data_frequency == 'daily' or data_frequency == '1d':
frequency = 'day'
else:
raise InvalidHistoryFrequencyError(
@@ -238,13 +258,14 @@ class Bittrex(Exchange):
# Making sure that assets are iterable
asset_list = [assets] if isinstance(assets, TradingPair) else assets
ohlc_map = dict()
for asset in asset_list:
end = int(time.mktime(end_dt.timetuple()))
url = '{url}/pub/market/GetTicks?marketName={symbol}' \
'&tickInterval={frequency}&_=1499127220008'.format(
'&tickInterval={frequency}&_={end}'.format(
url=URL2,
symbol=self.get_symbol(asset),
frequency=frequency
frequency=frequency,
end=end
)
try:
@@ -272,6 +293,7 @@ class Bittrex(Exchange):
return ohlc
ordered_candles = list(reversed(candles))
ohlc_map = dict()
if bar_count is None:
ohlc_map[asset] = ohlc_from_candle(ordered_candles[0])
else:
+9 -3
View File
@@ -373,7 +373,7 @@ class Exchange:
return value
def get_series_from_candles(self, candles, start_dt, end_dt,
field, previous_value=None):
data_frequency, field, previous_value=None):
"""
Get a series of field data for the specified candles.
@@ -388,9 +388,12 @@ class Exchange:
dates = [candle['last_traded'] for candle in candles]
values = [candle[field] for candle in candles]
periods = pd.date_range(start_dt, end_dt)
periods = self.bundle.get_calendar_periods_range(
start_dt, end_dt, data_frequency
)
series = pd.Series(values, index=dates)
#TODO: ensure that this working as expected, if not use fillna
series.reindex(periods, method='ffill', fill_value=previous_value)
return series
@@ -487,6 +490,7 @@ class Exchange:
data_frequency=data_frequency,
assets=asset,
bar_count=trailing_bar_count,
start_dt=start_dt,
end_dt=end_dt
)
@@ -497,6 +501,7 @@ class Exchange:
candles=candles,
start_dt=trailing_dt,
end_dt=end_dt,
data_frequency=data_frequency,
field=field,
previous_value=last_value
)
@@ -784,13 +789,14 @@ class Exchange:
pass
@abc.abstractmethod
def get_orderbook(self, asset, order_type):
def get_orderbook(self, asset, order_type, limit):
"""
Retrieve the the orderbook for the given trading pair.
:param asset: TradingPair
:param order_type: str
The type of orders: bid, ask or all
:param limit
:return:
"""
+3 -2
View File
@@ -16,7 +16,7 @@ class BcolzExchangeBarWriter(BcolzMinuteBarWriter):
end_session = end_session.floor('1d')
minutes_per_day = 1440 if self._data_frequency == 'minute' else 1
default_ohlc_ratio = kwargs.pop('default_ohlc_ratio', 1000000)
default_ohlc_ratio = kwargs.pop('default_ohlc_ratio', 100000000)
calendar = get_calendar('OPEN')
super(BcolzExchangeBarWriter, self) \
@@ -79,8 +79,9 @@ class BcolzExchangeBarReader(BcolzMinuteBarReader):
if mask is None:
mask = a != 0
inverse_ratio = self._ohlc_ratio_inverse_for_sid(sid)
out[:len(mask), i][mask] = (
a[mask] * self._ohlc_ratio_inverse_for_sid(sid)
a[mask] * inverse_ratio
)
if field in fields:
+81 -56
View File
@@ -194,6 +194,71 @@ class ExchangeBundle:
if data_frequency == 'minute' \
else self.calendar.sessions_in_range(start_dt, end_dt)
def ingest_df(self, ohlcv_df, data_frequency, asset, writer,
empty_rows_behavior='strip'):
"""
Ingest a DataFrame of OHLCV data for a given market.
:param ohlcv_df:
:param data_frequency:
:param asset:
:param writer:
:param path:
:param empty_rows_behavior:
:return:
"""
if empty_rows_behavior is not 'ignore':
nan_rows = ohlcv_df[ohlcv_df.isnull().T.any().T].index
if len(nan_rows) > 0:
dates = []
previous_date = None
for row_date in nan_rows.values:
row_date = pd.to_datetime(row_date)
if previous_date is None:
dates.append(row_date)
else:
seq_date = previous_date + get_delta(1, data_frequency)
if row_date > seq_date:
dates.append(previous_date)
dates.append(row_date)
previous_date = row_date
dates.append(pd.to_datetime(nan_rows.values[-1]))
name = '{} from {} to {}'.format(
asset.symbol, ohlcv_df.index[0], ohlcv_df.index[-1]
)
if empty_rows_behavior == 'warn':
log.warn(
'\n{name} with end minute {end_minute} has empty rows '
'in ranges: {dates}'.format(
name=name,
end_minute=asset.end_minute,
dates=dates
)
)
elif empty_rows_behavior == 'raise':
raise EmptyValuesInBundleError(
name=name,
end_minute=asset.end_minute,
dates=dates
)
else:
ohlcv_df.dropna(inplace=True)
data = []
if not ohlcv_df.empty:
ohlcv_df.sort_index(inplace=True)
data.append((asset.sid, ohlcv_df))
self._write(data, writer, data_frequency)
def ingest_ctable(self, asset, data_frequency, period, start_dt, end_dt,
writer, empty_rows_behavior='strip', cleanup=False):
"""
@@ -242,62 +307,19 @@ class ExchangeBundle:
periods = self.get_calendar_periods_range(
start_dt, end_dt, data_frequency
)
df = get_df_from_arrays(arrays, periods)
if empty_rows_behavior is not 'ignore':
nan_rows = df[df.isnull().T.any().T].index
if len(nan_rows) > 0:
dates = []
previous_date = None
for row_date in nan_rows.values:
row_date = pd.to_datetime(row_date)
if previous_date is None:
dates.append(row_date)
else:
seq_date = previous_date + get_delta(1, data_frequency)
if row_date > seq_date:
dates.append(previous_date)
dates.append(row_date)
previous_date = row_date
dates.append(pd.to_datetime(nan_rows.values[-1]))
name = path.split('/')[-1]
if empty_rows_behavior == 'warn':
log.warn(
'\n{name} with end minute {end_minute} has empty rows '
'in ranges: {dates}'.format(
name=name,
end_minute=asset.end_minute,
dates=dates
)
)
elif empty_rows_behavior == 'raise':
raise EmptyValuesInBundleError(
name=name,
end_minute=asset.end_minute,
dates=dates
)
else:
df.dropna(inplace=True)
data = []
if not df.empty:
df.sort_index(inplace=True)
data.append((asset.sid, df))
self._write(data, writer, data_frequency)
self.ingest_df(
ohlcv_df=df,
data_frequency=data_frequency,
asset=asset,
writer=writer,
empty_rows_behavior=empty_rows_behavior
)
if cleanup:
log.debug('removing bundle folder following '
'ingestion: {}'.format(path))
log.debug(
'removing bundle folder following ingestion: {}'.format(path)
)
shutil.rmtree(path)
return path
@@ -315,9 +337,12 @@ class ExchangeBundle:
earliest_trade = None
last_entry = None
for asset in assets:
if (earliest_trade is None or earliest_trade > asset.start_date) \
and asset.start_date >= self.calendar.first_session:
earliest_trade = asset.start_date
if earliest_trade is None or earliest_trade > asset.start_date:
if asset.start_date >= self.calendar.first_session:
earliest_trade = asset.start_date
else:
earliest_trade = self.calendar.first_session
end_asset = asset.end_minute if data_frequency == 'minute' else \
asset.end_daily
+7
View File
@@ -1,6 +1,8 @@
import json
import os
import pickle
from catalyst.assets._assets import TradingPair
from six.moves.urllib import request
from datetime import date, datetime
@@ -57,6 +59,11 @@ def get_exchange_symbols(exchange_name, environ=None):
)
def get_symbols_string(assets):
array = [assets] if isinstance(assets, TradingPair) else assets
return ', '.join([asset.symbol for asset in array])
def get_exchange_auth(exchange_name, environ=None):
exchange_folder = get_exchange_folder(exchange_name, environ)
filename = os.path.join(exchange_folder, 'auth.json')
+20 -8
View File
@@ -22,7 +22,7 @@ from catalyst.exchange.exchange_errors import (
from catalyst.exchange.exchange_execution import ExchangeLimitOrder, \
ExchangeStopLimitOrder
from catalyst.exchange.exchange_utils import get_exchange_symbols_filename, \
download_exchange_symbols
download_exchange_symbols, get_symbols_string
from catalyst.exchange.poloniex.poloniex_api import Poloniex_api
from catalyst.finance.order import Order, ORDER_STATUS
from catalyst.finance.transaction import Transaction
@@ -189,20 +189,32 @@ class Poloniex(Exchange):
if end_dt is None:
end_dt = pd.Timestamp.utcnow()
if (
data_frequency == '5m' or data_frequency == 'minute'): # TODO: Polo does not have '1m'
log.debug(
'retrieving {bars} {freq} candles on {exchange} from '
'{end_dt} for markets {symbols}, '.format(
bars=bar_count,
freq=data_frequency,
exchange=self.name,
end_dt=end_dt,
symbols=get_symbols_string(assets)
)
)
if data_frequency == '5m':
frequency = 300
elif (data_frequency == '15m'):
elif data_frequency == '15m':
frequency = 900
elif (data_frequency == '30m'):
elif data_frequency == '30m':
frequency = 1800
elif (data_frequency == '2h'):
elif data_frequency == '2h':
frequency = 7200
elif (data_frequency == '4h'):
elif data_frequency == '4h':
frequency = 14400
elif (data_frequency == '1D' or data_frequency == 'daily'):
elif data_frequency == '1D' or data_frequency == 'daily':
frequency = 86400
else:
# Poloniex does not offer 1m data candles
# It is likely to error out there frequently
raise InvalidHistoryFrequencyError(
frequency=data_frequency
)
+10 -6
View File
@@ -1,3 +1,4 @@
import pandas as pd
from catalyst.exchange.bittrex.bittrex import Bittrex
from catalyst.finance.order import Order
from base import BaseExchangeTestCase
@@ -7,15 +8,15 @@ from catalyst.exchange.exchange_utils import get_exchange_auth
log = Logger('test_bittrex')
class TestBittrexTestCase(BaseExchangeTestCase):
class TestBittrex(BaseExchangeTestCase):
@classmethod
def setup(self):
print ('creating bittrex object')
auth = get_exchange_auth('bittrex')
self.exchange = Bittrex(
key=auth['key'],
secret=auth['secret'],
base_currency='btc'
base_currency=None,
portfolio=None
)
def test_order(self):
@@ -52,15 +53,18 @@ class TestBittrexTestCase(BaseExchangeTestCase):
log.info('retrieving candles')
ohlcv_neo = self.exchange.get_candles(
data_frequency='5m',
assets=self.exchange.get_asset('neo_btc')
assets=self.exchange.get_asset('neo_btc'),
bar_count=20,
end_dt=pd.to_datetime('2017-10-20', utc=True)
)
ohlcv_neo_ubq = self.exchange.get_candles(
data_frequency='5m',
data_frequency='1d',
assets=[
self.exchange.get_asset('neo_btc'),
self.exchange.get_asset('ubq_btc')
],
bar_count=14
bar_count=14,
end_dt=pd.to_datetime('2017-10-20', utc=True)
)
pass
+63 -9
View File
@@ -40,16 +40,16 @@ class TestExchangeBundle:
def test_ingest_minute(self):
data_frequency = 'minute'
exchange_name = 'poloniex'
exchange_name = 'bitfinex'
exchange = get_exchange(exchange_name)
exchange_bundle = ExchangeBundle(exchange)
assets = [
exchange.get_asset('burst_btc')
exchange.get_asset('iot_btc')
]
# start = pd.to_datetime('2017-09-01', utc=True)
start = pd.to_datetime('2017-9-15', utc=True)
start = pd.to_datetime('2017-9-01', utc=True)
end = pd.to_datetime('2017-9-30', utc=True)
log.info('ingesting exchange bundle {}'.format(exchange_name))
@@ -318,15 +318,15 @@ class TestExchangeBundle:
pass
def test_validate_data(self):
exchange_name = 'poloniex'
exchange_name = 'bitfinex'
data_frequency = 'minute'
exchange = get_exchange(exchange_name)
exchange_bundle = ExchangeBundle(exchange)
assets = [exchange.get_asset('neos_btc')]
assets = [exchange.get_asset('iot_btc')]
end_dt = pd.to_datetime('2017-10-20', utc=True)
bar_count = 100
end_dt = pd.to_datetime('2017-9-2 1:00', utc=True)
bar_count = 60
bundle_series = exchange_bundle.get_history_window_series(
assets=assets,
@@ -349,12 +349,11 @@ class TestExchangeBundle:
data=dict(bundle_price=bundle_series[asset]),
index=bundle_series[asset].index
)
bundle_df = bundle_df.resample('5T').last()
exchange_series = exchange.get_series_from_candles(
candles=candles[asset],
start_dt=start_dt,
end_dt=end_dt,
data_frequency=data_frequency,
field='close'
)
exchange_df = pd.DataFrame(
@@ -372,3 +371,58 @@ class TestExchangeBundle:
df = pd.concat(frames)
print('\n' + df_to_string(df))
pass
def test_ingest_candles(self):
exchange_name = 'bitfinex'
data_frequency = 'minute'
exchange = get_exchange(exchange_name)
bundle = ExchangeBundle(exchange)
assets = [exchange.get_asset('iot_btc')]
end_dt = pd.to_datetime('2017-10-20', utc=True)
bar_count = 100
start_dt = get_start_dt(end_dt, bar_count, data_frequency)
candles = exchange.get_candles(
assets=assets,
start_dt=start_dt,
end_dt=end_dt,
bar_count=bar_count,
data_frequency=data_frequency
)
writer = bundle.get_writer(start_dt, end_dt, data_frequency)
for asset in assets:
dates = [candle['last_traded'] for candle in candles[asset]]
values = dict()
for field in ['open', 'high', 'low', 'close', 'volume']:
values[field] = [candle[field] for candle in candles[asset]]
periods = bundle.get_calendar_periods_range(
start_dt, end_dt, data_frequency
)
df = pd.DataFrame(values, index=dates)
df = df.loc[periods].fillna(method='ffill')
# TODO: why do I get an extra bar?
bundle.ingest_df(
ohlcv_df=df,
data_frequency=data_frequency,
asset=asset,
writer=writer,
empty_rows_behavior='raise'
)
bundle_series = bundle.get_history_window_series(
assets=assets,
end_dt=end_dt,
bar_count=bar_count,
field='close',
data_frequency=data_frequency,
reset_reader=True
)
df = pd.DataFrame(bundle_series)
print('\n' + df_to_string(df))
pass