diff --git a/catalyst/__main__.py b/catalyst/__main__.py index 1edb53f4..3a1e2733 100644 --- a/catalyst/__main__.py +++ b/catalyst/__main__.py @@ -509,6 +509,32 @@ def ingest_exchange(exchange_name, data_frequency, start, end, ) +@main.command(name='clean-exchange') +@click.option( + '-x', + '--exchange-name', + type=click.Choice({'bitfinex', 'bittrex', 'poloniex'}), + help='The name of the exchange bundle to ingest (supported: bitfinex,' + ' bittrex, poloniex).', +) +@click.option( + '-f', + '--data-frequency', + type=click.Choice({'daily', 'minute', 'daily,minute', 'minute,daily'}), + default=None, + help='The data frequency of the desired OHLCV bars.', +) +def clean_exchange(exchange_name, data_frequency): + exchange = get_exchange(exchange_name) + exchange_bundle = ExchangeBundle(exchange) + + click.echo('Cleaning exchange bundle {}...'.format(exchange_name)) + exchange_bundle.clean( + data_frequency=data_frequency, + ) + click.echo('Done') + + @main.command() @click.option( '-b', diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index 80fbf719..76d7205b 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -18,7 +18,8 @@ from catalyst.exchange.exchange_errors import EmptyValuesInBundleError, \ InvalidHistoryFrequencyError, TempBundleNotFoundError, \ NoDataAvailableOnExchange, \ PricingDataNotLoadedError -from catalyst.exchange.exchange_utils import get_exchange_folder +from catalyst.exchange.exchange_utils import get_exchange_folder, \ + get_exchange_bundles_folder from catalyst.utils.cli import maybe_show_progress from catalyst.utils.paths import ensure_directory @@ -226,18 +227,18 @@ class ExchangeBundle: if reader is None: raise TempBundleNotFoundError(path=path) - arrays = None - try: - arrays = reader.load_raw_arrays( - sids=[asset.sid], - fields=['open', 'high', 'low', 'close', 'volume'], - start_dt=start_dt, - end_dt=end_dt - ) - except Exception as e: - log.warn('skipping ctable for {} from {} to {}: {}'.format( - asset.symbol, start_dt, end_dt, e - )) + # arrays = None + # try: + arrays = reader.load_raw_arrays( + sids=[asset.sid], + fields=['open', 'high', 'low', 'close', 'volume'], + start_dt=start_dt, + end_dt=end_dt + ) + # except Exception as e: + # log.warn('skipping ctable for {} from {} to {}: {}'.format( + # asset.symbol, start_dt, end_dt, e + # )) if not arrays: return path @@ -438,7 +439,8 @@ class ExchangeBundle: start_dt=chunk['period_start'], end_dt=chunk['period_end'], writer=writer, - empty_rows_behavior='strip' + empty_rows_behavior='strip', + cleanup=True ) def ingest(self, data_frequency, include_symbols=None, @@ -525,7 +527,7 @@ class ExchangeBundle: return values except Exception: - symbols = [asset.symbol.encode('utf-8') for asset in assets] + symbols = [asset.symbol for asset in assets] raise PricingDataNotLoadedError( field=field, first_trading_day=min([asset.start_date for asset in assets]), @@ -552,7 +554,7 @@ class ExchangeBundle: reader = self.get_reader(data_frequency) if reader is None: - symbols = [asset.symbol.encode('utf-8') for asset in assets] + symbols = [asset.symbol for asset in assets] raise PricingDataNotLoadedError( field=field, first_trading_day=min([asset.start_date for asset in assets]), @@ -610,3 +612,30 @@ class ExchangeBundle: series[asset] = value_series return series + + def clean(self, data_frequency): + log.debug('cleaning exchange {}, frequency {}'.format( + self.exchange.name, data_frequency + )) + root = get_exchange_folder(self.exchange.name) + + temp_bundles = os.path.join(root, 'temp_bundles') + + if os.path.isdir(temp_bundles): + log.debug('removing folder and content: {}'.format(temp_bundles)) + shutil.rmtree(temp_bundles) + log.debug('{} removed'.format(temp_bundles)) + + frequencies = ['daily', 'minute'] if data_frequency is None \ + else [data_frequency] + + for frequency in frequencies: + label = '{}_bundle'.format(frequency) + frequency_bundle = os.path.join(root, label) + + if os.path.isdir(frequency_bundle): + log.debug( + 'removing folder and content: {}'.format(frequency_bundle) + ) + shutil.rmtree(frequency_bundle) + log.debug('{} removed'.format(frequency_bundle))