Bug fixes and housekeeping from ingestion testing

This commit is contained in:
fredfortier
2017-10-12 00:51:18 -04:00
parent 3f9b44f3e4
commit 4895bef392
8 changed files with 177 additions and 91 deletions
+4 -2
View File
@@ -443,7 +443,7 @@ def live(ctx,
return perf
@main.command()
@main.command(name='ingest-exchange')
@click.option(
'-x',
'--exchange-name',
@@ -452,6 +452,7 @@ def live(ctx,
' bittrex, poloniex).',
)
@click.option(
'-f',
'--data-frequency',
type=click.Choice({'daily', 'minute', 'daily,minute'}),
default='daily',
@@ -473,6 +474,7 @@ def live(ctx,
help='The end date of the data range. (default: today)',
)
@click.option(
'-i',
'--include-symbols',
default=None,
help='A list of symbols to ingest (optional comma separated list)',
@@ -493,7 +495,7 @@ def ingest_exchange(exchange_name, data_frequency, start, end,
"""
Ingest data for the given exchange.
"""
exchange=get_exchange(exchange_name)
exchange = get_exchange(exchange_name)
exchange_bundle = ExchangeBundle(exchange)
click.echo('ingesting exchange bundle {}'.format(exchange_name))
+2
View File
@@ -28,6 +28,8 @@ class Bittrex(Exchange):
self.base_currency = base_currency
self._portfolio = portfolio
self.num_candles_limit = 2000
# Not sure what the rate limit is but trying to play it safe
# https://bitcoin.stackexchange.com/questions/53778/bittrex-api-rate-limit
self.max_requests_per_minute = 60
+41
View File
@@ -3,6 +3,7 @@ from datetime import timedelta, datetime
import os
from logging import Logger
import pandas as pd
import numpy as np
import pytz
@@ -113,6 +114,21 @@ def get_delta(periods, data_frequency):
if data_frequency == 'minute' else timedelta(days=periods)
def get_periods(start_dt, end_dt, data_frequency):
delta = end_dt - start_dt
if data_frequency == 'minute':
delta_periods = delta.total_seconds() / 60
elif data_frequency == 'daily':
delta_periods = delta.total_seconds() / 60 / 60 / 24
else:
raise ValueError('frequency not supported')
return int(delta_periods)
def get_start_dt(end_dt, bar_count, data_frequency):
periods = bar_count - 1
if periods > 1:
@@ -162,6 +178,31 @@ def get_ffill_candles(candles, bar_count, end_dt, data_frequency,
return all_dates, all_candles
def range_in_bundle(asset, start_dt, end_dt, reader):
has_data = True
if has_data and reader is not None:
try:
start_close = \
reader.get_value(asset.sid, start_dt, 'close')
if np.isnan(start_close):
has_data = False
else:
end_close = reader.get_value(asset.sid, end_dt, 'close')
if np.isnan(end_close):
has_data = False
except Exception:
has_data = False
else:
has_data = False
return has_data
@deprecated
def get_history_mock(exchange_name, data_frequency, symbol, start_ms, end_ms,
exchanges):
+9 -8
View File
@@ -466,7 +466,7 @@ class Exchange:
exchange_start = None
catalyst_end = None
if start < asset.end_minute:
if asset.end_minute is not None and start < asset.end_minute:
catalyst_start = start
if end <= asset.end_minute:
catalyst_end = end
@@ -581,13 +581,14 @@ class Exchange:
if len(missing_assets) > 0:
writer = bundle.get_writer(start_dt, end_dt, data_frequency)
bundle.ingest_chunk(
bar_count=adj_bar_count,
end_dt=end_dt,
data_frequency=data_frequency,
assets=missing_assets,
writer=writer
)
for asset in missing_assets:
bundle.ingest_chunk(
bar_count=adj_bar_count,
end_dt=end_dt,
data_frequency=data_frequency,
asset=asset,
writer=writer
)
reader = bundle.get_reader(data_frequency)
values = reader.load_raw_arrays(
+95 -80
View File
@@ -1,17 +1,16 @@
import os
from datetime import timedelta
import numpy as np
import pandas as pd
from logbook import Logger
from pandas import DatetimeIndex
from logbook import Logger, DEBUG, INFO
from catalyst import get_calendar
from catalyst.data.minute_bars import BcolzMinuteOverlappingData, \
BcolzMinuteBarWriter, BcolzMinuteBarReader, BcolzMinuteBarMetadata
from catalyst.data.us_equity_pricing import BcolzDailyBarWriter, \
BcolzDailyBarReader
from catalyst.exchange.bundle_utils import get_ffill_candles, get_start_dt
from catalyst.exchange.bundle_utils import get_ffill_candles, get_start_dt, \
get_periods, range_in_bundle
from catalyst.exchange.exchange_utils import get_exchange_folder
from catalyst.utils.cli import maybe_show_progress
from catalyst.utils.paths import ensure_directory
@@ -23,6 +22,7 @@ def _cachpath(symbol, type_):
BUNDLE_NAME_TEMPLATE = '{root}/{frequency}_bundle'
log = Logger('exchange_bundle')
log.level = INFO
class ExchangeBundle:
@@ -45,8 +45,8 @@ class ExchangeBundle:
def get_adj_dates(self, start, end, assets):
now = pd.Timestamp.utcnow()
if end > now:
log.info('adjusting the end date to now {}'.format(now))
if end is None or end > now:
log.debug('adjusting the end date to now {}'.format(now))
end = now
earliest_trade = None
@@ -54,8 +54,8 @@ class ExchangeBundle:
if earliest_trade is None or earliest_trade > asset.start_date:
earliest_trade = asset.start_date
if earliest_trade > start:
log.info(
if start is None or earliest_trade > start:
log.debug(
'adjusting start date to earliest trade date found {}'.format(
earliest_trade
))
@@ -201,65 +201,30 @@ class ExchangeBundle:
reader = self.get_reader(data_frequency)
missing_assets = []
for asset in assets:
has_data = True
if has_data and reader is not None:
try:
start_close = \
reader.get_value(asset.sid, start_dt, 'close')
if np.isnan(start_close):
has_data = False
else:
end_close = reader.get_value(asset.sid, end_dt,
'close')
if np.isnan(end_close):
has_data = False
except Exception as e:
has_data = False
else:
has_data = False
has_data = range_in_bundle(asset, start_dt, end_dt, reader)
if not has_data:
missing_assets.append(asset)
return missing_assets
def ingest_chunk(self, bar_count, end_dt, data_frequency, assets,
def ingest_chunk(self, bar_count, end_dt, data_frequency, asset,
writer, previous_candle=dict()):
"""
Retrieve the specified OHLCV chunk and write it to the bundle
:param chunk:
:param previous_candle:
:param bar_count:
:param end_dt:
:param data_frequency:
:param assets:
:param asset:
:param writer:
:param previous_candle
:return:
"""
chunk_assets = []
for asset in assets:
if asset.start_date <= end_dt:
chunk_assets.append(asset)
start_dt = get_start_dt(end_dt, bar_count, data_frequency)
missing_assets = self.filter_existing_assets(
assets=chunk_assets,
start_dt=start_dt,
end_dt=end_dt,
data_frequency=data_frequency
)
if len(missing_assets) == 0:
log.debug('the data chunk already exists')
return
# The get_history method supports multiple asset
candles = self.exchange.get_history(
assets=missing_assets,
assets=[asset],
end_dt=end_dt,
bar_count=bar_count,
data_frequency=data_frequency
@@ -272,7 +237,7 @@ class ExchangeBundle:
if not asset_candles:
log.debug(
'no data: {symbols} on {exchange}, date {end}'.format(
symbols=missing_assets,
symbols=asset,
exchange=self.exchange.name,
end=end_dt
)
@@ -329,6 +294,7 @@ class ExchangeBundle:
del self._writers[data_frequency]
# TODO: these are the dates of the chunk, not the job
start_dt = get_start_dt(end_dt, bar_count, data_frequency)
writer = self.get_writer(start_dt, end_dt, data_frequency)
writer.write(
data=data,
@@ -357,8 +323,8 @@ class ExchangeBundle:
assets = self.get_assets(include_symbols, exclude_symbols)
start, end = self.get_adj_dates(start, end, assets)
symbols = []
log.debug(
symbols = list(map(lambda asset: asset.symbol, assets))
log.info(
'ingesting trading pairs {symbols} on exchange {exchange} '
'from {start} to {end}'.format(
symbols=symbols,
@@ -368,39 +334,88 @@ class ExchangeBundle:
)
)
delta = end - start
if data_frequency == 'minute':
delta_periods = delta.total_seconds() / 60
elif data_frequency == 'daily':
delta_periods = delta.total_seconds() / 60 / 60 / 24
else:
raise ValueError('frequency not supported')
writer = self.get_writer(start, end, data_frequency)
reader = self.get_reader(data_frequency)
if delta_periods > self.exchange.num_candles_limit:
bar_count = self.exchange.num_candles_limit
all_chunks = []
for asset in assets:
try:
asset_start, asset_end = \
self.get_adj_dates(start, end, [asset])
chunks = []
last_chunk_date = end.floor('1 min')
while last_chunk_date > start + timedelta(minutes=bar_count):
# TODO: account for the partial last bar
chunk = dict(end=last_chunk_date, bar_count=bar_count)
chunks.append(chunk)
except ValueError as e:
log.debug('asset outside of range {} {}'.format(asset, e))
continue
# TODO: base on frequency
last_chunk_date = \
last_chunk_date - timedelta(minutes=(bar_count + 1))
asset_periods = get_periods(asset_start, asset_end, data_frequency)
if asset_periods > self.exchange.num_candles_limit:
bar_count = self.exchange.num_candles_limit
chunks.reverse()
chunks = []
else:
chunks = [dict(end=end, bar_count=delta_periods)]
period_delta = timedelta(minutes=1) \
if data_frequency == 'minute' else \
timedelta(days=1)
chunk_start = asset_start.floor('1 min') - period_delta
while chunk_start < asset_end:
delta = timedelta(minutes=bar_count) \
if data_frequency == 'minute' else \
timedelta(days=bar_count)
chunk_end = chunk_start + delta \
if chunk_start + delta < asset_end else asset_end
chunk_periods = \
get_periods(chunk_start, chunk_end, data_frequency)
range_start = \
get_start_dt(chunk_end, chunk_periods, data_frequency)
if range_in_bundle(asset, range_start, chunk_end, reader):
log.debug(
'chunk already ingested {symbol} '
'{start} to {end}'.format(
symbol=asset.symbol,
start=range_start,
end=chunk_end
)
)
chunk_start = chunk_end + period_delta
continue
chunk = dict(
asset=asset,
end=chunk_end,
bar_count=chunk_periods
)
chunks.append(chunk)
chunk_start = chunk_end + period_delta
all_chunks += chunks
else:
if range_in_bundle(asset, asset_start, asset_end, reader):
log.debug(
'asset already ingested {symbol} '
'{start} to {end}'.format(
symbol=asset.symbol,
start=asset_start,
end=asset_end
)
)
continue
all_chunks += [
dict(asset=asset, end=asset_end, bar_count=asset_periods)
]
all_chunks.sort(key=lambda chunk: chunk['end'])
with maybe_show_progress(
chunks,
all_chunks,
show_progress,
label='Fetching {exchange} {frequency} candles: '.format(
exchange=self.exchange.name,
@@ -413,7 +428,7 @@ class ExchangeBundle:
bar_count=chunk['bar_count'],
end_dt=chunk['end'],
data_frequency=data_frequency,
assets=assets,
asset=chunk['asset'],
writer=writer,
previous_candle=previous_candle,
)
+3 -1
View File
@@ -1,6 +1,7 @@
import sys, inspect
from catalyst.errors import ZiplineError
class ZiplineErrorSilent(ZiplineError):
def __init__(self, **kwargs):
msg = self.msg.format(**kwargs)
@@ -10,7 +11,8 @@ class ZiplineErrorSilent(ZiplineError):
except AttributeError:
ln = inspect.currentframe().f_back.f_lineno
fn = inspect.currentframe().f_back.f_code.co_filename
msg = "Error traceback: {1} (line {2})\n{0.__name__}: {3}.".format(type(self), fn, ln, msg)
msg = "Error traceback: {1} (line {2})\n{0.__name__}: {3}.".format(
type(self), fn, ln, msg)
sys.exit(msg)
+4
View File
@@ -48,6 +48,10 @@ class Poloniex(Exchange):
self.minute_reader = None
self.transactions = defaultdict(list)
self.num_candles_limit = 2000
self.max_requests_per_minute = 20
self.request_cpt = dict()
def sanitize_curency_symbol(self, exchange_symbol):
"""
+19
View File
@@ -30,6 +30,25 @@ class ExchangeBundleTestCase:
)
pass
def test_ingest_minute_all(self):
exchange_name = 'bitfinex'
# start = pd.to_datetime('2017-09-01', utc=True)
start = pd.to_datetime('2017-10-01', utc=True)
end = pd.to_datetime('2017-10-05', utc=True)
exchange_bundle = ExchangeBundle(get_exchange(exchange_name))
log.info('ingesting exchange bundle {}'.format(exchange_name))
exchange_bundle.ingest(
data_frequency='minute',
exclude_symbols=None,
start=start,
end=end,
show_progress=True
)
pass
def test_ingest_daily(self):
exchange_name = 'bitfinex'