diff --git a/catalyst/__main__.py b/catalyst/__main__.py index 60758487..d3ca57c8 100644 --- a/catalyst/__main__.py +++ b/catalyst/__main__.py @@ -443,7 +443,7 @@ def live(ctx, return perf -@main.command() +@main.command(name='ingest-exchange') @click.option( '-x', '--exchange-name', @@ -452,6 +452,7 @@ def live(ctx, ' bittrex, poloniex).', ) @click.option( + '-f', '--data-frequency', type=click.Choice({'daily', 'minute', 'daily,minute'}), default='daily', @@ -473,6 +474,7 @@ def live(ctx, help='The end date of the data range. (default: today)', ) @click.option( + '-i', '--include-symbols', default=None, help='A list of symbols to ingest (optional comma separated list)', @@ -493,7 +495,7 @@ def ingest_exchange(exchange_name, data_frequency, start, end, """ Ingest data for the given exchange. """ - exchange=get_exchange(exchange_name) + exchange = get_exchange(exchange_name) exchange_bundle = ExchangeBundle(exchange) click.echo('ingesting exchange bundle {}'.format(exchange_name)) diff --git a/catalyst/exchange/bittrex/bittrex.py b/catalyst/exchange/bittrex/bittrex.py index e59281fa..d81d49b7 100644 --- a/catalyst/exchange/bittrex/bittrex.py +++ b/catalyst/exchange/bittrex/bittrex.py @@ -28,6 +28,8 @@ class Bittrex(Exchange): self.base_currency = base_currency self._portfolio = portfolio + self.num_candles_limit = 2000 + # Not sure what the rate limit is but trying to play it safe # https://bitcoin.stackexchange.com/questions/53778/bittrex-api-rate-limit self.max_requests_per_minute = 60 diff --git a/catalyst/exchange/bundle_utils.py b/catalyst/exchange/bundle_utils.py index ce0130a1..d9b34ac8 100644 --- a/catalyst/exchange/bundle_utils.py +++ b/catalyst/exchange/bundle_utils.py @@ -3,6 +3,7 @@ from datetime import timedelta, datetime import os from logging import Logger import pandas as pd +import numpy as np import pytz @@ -113,6 +114,21 @@ def get_delta(periods, data_frequency): if data_frequency == 'minute' else timedelta(days=periods) +def get_periods(start_dt, end_dt, data_frequency): + delta = end_dt - start_dt + + if data_frequency == 'minute': + delta_periods = delta.total_seconds() / 60 + + elif data_frequency == 'daily': + delta_periods = delta.total_seconds() / 60 / 60 / 24 + + else: + raise ValueError('frequency not supported') + + return int(delta_periods) + + def get_start_dt(end_dt, bar_count, data_frequency): periods = bar_count - 1 if periods > 1: @@ -162,6 +178,31 @@ def get_ffill_candles(candles, bar_count, end_dt, data_frequency, return all_dates, all_candles +def range_in_bundle(asset, start_dt, end_dt, reader): + has_data = True + if has_data and reader is not None: + try: + start_close = \ + reader.get_value(asset.sid, start_dt, 'close') + + if np.isnan(start_close): + has_data = False + + else: + end_close = reader.get_value(asset.sid, end_dt, 'close') + + if np.isnan(end_close): + has_data = False + + except Exception: + has_data = False + + else: + has_data = False + + return has_data + + @deprecated def get_history_mock(exchange_name, data_frequency, symbol, start_ms, end_ms, exchanges): diff --git a/catalyst/exchange/exchange.py b/catalyst/exchange/exchange.py index 36eb9628..d89e980c 100644 --- a/catalyst/exchange/exchange.py +++ b/catalyst/exchange/exchange.py @@ -466,7 +466,7 @@ class Exchange: exchange_start = None catalyst_end = None - if start < asset.end_minute: + if asset.end_minute is not None and start < asset.end_minute: catalyst_start = start if end <= asset.end_minute: catalyst_end = end @@ -581,13 +581,14 @@ class Exchange: if len(missing_assets) > 0: writer = bundle.get_writer(start_dt, end_dt, data_frequency) - bundle.ingest_chunk( - bar_count=adj_bar_count, - end_dt=end_dt, - data_frequency=data_frequency, - assets=missing_assets, - writer=writer - ) + for asset in missing_assets: + bundle.ingest_chunk( + bar_count=adj_bar_count, + end_dt=end_dt, + data_frequency=data_frequency, + asset=asset, + writer=writer + ) reader = bundle.get_reader(data_frequency) values = reader.load_raw_arrays( diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index 34d5330b..dbd4a9ae 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -1,17 +1,16 @@ import os from datetime import timedelta -import numpy as np import pandas as pd -from logbook import Logger -from pandas import DatetimeIndex +from logbook import Logger, DEBUG, INFO from catalyst import get_calendar from catalyst.data.minute_bars import BcolzMinuteOverlappingData, \ BcolzMinuteBarWriter, BcolzMinuteBarReader, BcolzMinuteBarMetadata from catalyst.data.us_equity_pricing import BcolzDailyBarWriter, \ BcolzDailyBarReader -from catalyst.exchange.bundle_utils import get_ffill_candles, get_start_dt +from catalyst.exchange.bundle_utils import get_ffill_candles, get_start_dt, \ + get_periods, range_in_bundle from catalyst.exchange.exchange_utils import get_exchange_folder from catalyst.utils.cli import maybe_show_progress from catalyst.utils.paths import ensure_directory @@ -23,6 +22,7 @@ def _cachpath(symbol, type_): BUNDLE_NAME_TEMPLATE = '{root}/{frequency}_bundle' log = Logger('exchange_bundle') +log.level = INFO class ExchangeBundle: @@ -45,8 +45,8 @@ class ExchangeBundle: def get_adj_dates(self, start, end, assets): now = pd.Timestamp.utcnow() - if end > now: - log.info('adjusting the end date to now {}'.format(now)) + if end is None or end > now: + log.debug('adjusting the end date to now {}'.format(now)) end = now earliest_trade = None @@ -54,8 +54,8 @@ class ExchangeBundle: if earliest_trade is None or earliest_trade > asset.start_date: earliest_trade = asset.start_date - if earliest_trade > start: - log.info( + if start is None or earliest_trade > start: + log.debug( 'adjusting start date to earliest trade date found {}'.format( earliest_trade )) @@ -201,65 +201,30 @@ class ExchangeBundle: reader = self.get_reader(data_frequency) missing_assets = [] for asset in assets: - has_data = True - if has_data and reader is not None: - try: - start_close = \ - reader.get_value(asset.sid, start_dt, 'close') - - if np.isnan(start_close): - has_data = False - - else: - end_close = reader.get_value(asset.sid, end_dt, - 'close') - - if np.isnan(end_close): - has_data = False - - except Exception as e: - has_data = False - - else: - has_data = False + has_data = range_in_bundle(asset, start_dt, end_dt, reader) if not has_data: missing_assets.append(asset) return missing_assets - def ingest_chunk(self, bar_count, end_dt, data_frequency, assets, + def ingest_chunk(self, bar_count, end_dt, data_frequency, asset, writer, previous_candle=dict()): """ Retrieve the specified OHLCV chunk and write it to the bundle - :param chunk: - :param previous_candle: + :param bar_count: + :param end_dt: :param data_frequency: - :param assets: + :param asset: :param writer: + :param previous_candle :return: """ - chunk_assets = [] - for asset in assets: - if asset.start_date <= end_dt: - chunk_assets.append(asset) - - start_dt = get_start_dt(end_dt, bar_count, data_frequency) - missing_assets = self.filter_existing_assets( - assets=chunk_assets, - start_dt=start_dt, - end_dt=end_dt, - data_frequency=data_frequency - ) - - if len(missing_assets) == 0: - log.debug('the data chunk already exists') - return - + # The get_history method supports multiple asset candles = self.exchange.get_history( - assets=missing_assets, + assets=[asset], end_dt=end_dt, bar_count=bar_count, data_frequency=data_frequency @@ -272,7 +237,7 @@ class ExchangeBundle: if not asset_candles: log.debug( 'no data: {symbols} on {exchange}, date {end}'.format( - symbols=missing_assets, + symbols=asset, exchange=self.exchange.name, end=end_dt ) @@ -329,6 +294,7 @@ class ExchangeBundle: del self._writers[data_frequency] # TODO: these are the dates of the chunk, not the job + start_dt = get_start_dt(end_dt, bar_count, data_frequency) writer = self.get_writer(start_dt, end_dt, data_frequency) writer.write( data=data, @@ -357,8 +323,8 @@ class ExchangeBundle: assets = self.get_assets(include_symbols, exclude_symbols) start, end = self.get_adj_dates(start, end, assets) - symbols = [] - log.debug( + symbols = list(map(lambda asset: asset.symbol, assets)) + log.info( 'ingesting trading pairs {symbols} on exchange {exchange} ' 'from {start} to {end}'.format( symbols=symbols, @@ -368,39 +334,88 @@ class ExchangeBundle: ) ) - delta = end - start - if data_frequency == 'minute': - delta_periods = delta.total_seconds() / 60 - - elif data_frequency == 'daily': - delta_periods = delta.total_seconds() / 60 / 60 / 24 - - else: - raise ValueError('frequency not supported') - writer = self.get_writer(start, end, data_frequency) + reader = self.get_reader(data_frequency) - if delta_periods > self.exchange.num_candles_limit: - bar_count = self.exchange.num_candles_limit + all_chunks = [] + for asset in assets: + try: + asset_start, asset_end = \ + self.get_adj_dates(start, end, [asset]) - chunks = [] - last_chunk_date = end.floor('1 min') - while last_chunk_date > start + timedelta(minutes=bar_count): - # TODO: account for the partial last bar - chunk = dict(end=last_chunk_date, bar_count=bar_count) - chunks.append(chunk) + except ValueError as e: + log.debug('asset outside of range {} {}'.format(asset, e)) + continue - # TODO: base on frequency - last_chunk_date = \ - last_chunk_date - timedelta(minutes=(bar_count + 1)) + asset_periods = get_periods(asset_start, asset_end, data_frequency) + if asset_periods > self.exchange.num_candles_limit: + bar_count = self.exchange.num_candles_limit - chunks.reverse() + chunks = [] - else: - chunks = [dict(end=end, bar_count=delta_periods)] + period_delta = timedelta(minutes=1) \ + if data_frequency == 'minute' else \ + timedelta(days=1) + + chunk_start = asset_start.floor('1 min') - period_delta + while chunk_start < asset_end: + delta = timedelta(minutes=bar_count) \ + if data_frequency == 'minute' else \ + timedelta(days=bar_count) + + chunk_end = chunk_start + delta \ + if chunk_start + delta < asset_end else asset_end + + chunk_periods = \ + get_periods(chunk_start, chunk_end, data_frequency) + + range_start = \ + get_start_dt(chunk_end, chunk_periods, data_frequency) + + if range_in_bundle(asset, range_start, chunk_end, reader): + log.debug( + 'chunk already ingested {symbol} ' + '{start} to {end}'.format( + symbol=asset.symbol, + start=range_start, + end=chunk_end + ) + ) + + chunk_start = chunk_end + period_delta + continue + + chunk = dict( + asset=asset, + end=chunk_end, + bar_count=chunk_periods + ) + chunks.append(chunk) + + chunk_start = chunk_end + period_delta + + all_chunks += chunks + + else: + if range_in_bundle(asset, asset_start, asset_end, reader): + log.debug( + 'asset already ingested {symbol} ' + '{start} to {end}'.format( + symbol=asset.symbol, + start=asset_start, + end=asset_end + ) + ) + continue + + all_chunks += [ + dict(asset=asset, end=asset_end, bar_count=asset_periods) + ] + + all_chunks.sort(key=lambda chunk: chunk['end']) with maybe_show_progress( - chunks, + all_chunks, show_progress, label='Fetching {exchange} {frequency} candles: '.format( exchange=self.exchange.name, @@ -413,7 +428,7 @@ class ExchangeBundle: bar_count=chunk['bar_count'], end_dt=chunk['end'], data_frequency=data_frequency, - assets=assets, + asset=chunk['asset'], writer=writer, previous_candle=previous_candle, ) diff --git a/catalyst/exchange/exchange_errors.py b/catalyst/exchange/exchange_errors.py index 2bb67c45..04cb9064 100644 --- a/catalyst/exchange/exchange_errors.py +++ b/catalyst/exchange/exchange_errors.py @@ -1,6 +1,7 @@ import sys, inspect from catalyst.errors import ZiplineError + class ZiplineErrorSilent(ZiplineError): def __init__(self, **kwargs): msg = self.msg.format(**kwargs) @@ -10,7 +11,8 @@ class ZiplineErrorSilent(ZiplineError): except AttributeError: ln = inspect.currentframe().f_back.f_lineno fn = inspect.currentframe().f_back.f_code.co_filename - msg = "Error traceback: {1} (line {2})\n{0.__name__}: {3}.".format(type(self), fn, ln, msg) + msg = "Error traceback: {1} (line {2})\n{0.__name__}: {3}.".format( + type(self), fn, ln, msg) sys.exit(msg) diff --git a/catalyst/exchange/poloniex/poloniex.py b/catalyst/exchange/poloniex/poloniex.py index 0260b823..151fecd8 100644 --- a/catalyst/exchange/poloniex/poloniex.py +++ b/catalyst/exchange/poloniex/poloniex.py @@ -48,6 +48,10 @@ class Poloniex(Exchange): self.minute_reader = None self.transactions = defaultdict(list) + self.num_candles_limit = 2000 + self.max_requests_per_minute = 20 + self.request_cpt = dict() + def sanitize_curency_symbol(self, exchange_symbol): """ diff --git a/tests/exchange/test_bundle.py b/tests/exchange/test_bundle.py index f99ced28..43895198 100644 --- a/tests/exchange/test_bundle.py +++ b/tests/exchange/test_bundle.py @@ -30,6 +30,25 @@ class ExchangeBundleTestCase: ) pass + def test_ingest_minute_all(self): + exchange_name = 'bitfinex' + + # start = pd.to_datetime('2017-09-01', utc=True) + start = pd.to_datetime('2017-10-01', utc=True) + end = pd.to_datetime('2017-10-05', utc=True) + + exchange_bundle = ExchangeBundle(get_exchange(exchange_name)) + + log.info('ingesting exchange bundle {}'.format(exchange_name)) + exchange_bundle.ingest( + data_frequency='minute', + exclude_symbols=None, + start=start, + end=end, + show_progress=True + ) + pass + def test_ingest_daily(self): exchange_name = 'bitfinex'