mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-28 23:08:01 +08:00
Added method to clean bundle folders
This commit is contained in:
@@ -509,6 +509,32 @@ def ingest_exchange(exchange_name, data_frequency, start, end,
|
||||
)
|
||||
|
||||
|
||||
@main.command(name='clean-exchange')
|
||||
@click.option(
|
||||
'-x',
|
||||
'--exchange-name',
|
||||
type=click.Choice({'bitfinex', 'bittrex', 'poloniex'}),
|
||||
help='The name of the exchange bundle to ingest (supported: bitfinex,'
|
||||
' bittrex, poloniex).',
|
||||
)
|
||||
@click.option(
|
||||
'-f',
|
||||
'--data-frequency',
|
||||
type=click.Choice({'daily', 'minute', 'daily,minute', 'minute,daily'}),
|
||||
default=None,
|
||||
help='The data frequency of the desired OHLCV bars.',
|
||||
)
|
||||
def clean_exchange(exchange_name, data_frequency):
|
||||
exchange = get_exchange(exchange_name)
|
||||
exchange_bundle = ExchangeBundle(exchange)
|
||||
|
||||
click.echo('Cleaning exchange bundle {}...'.format(exchange_name))
|
||||
exchange_bundle.clean(
|
||||
data_frequency=data_frequency,
|
||||
)
|
||||
click.echo('Done')
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.option(
|
||||
'-b',
|
||||
|
||||
@@ -18,7 +18,8 @@ from catalyst.exchange.exchange_errors import EmptyValuesInBundleError, \
|
||||
InvalidHistoryFrequencyError, TempBundleNotFoundError, \
|
||||
NoDataAvailableOnExchange, \
|
||||
PricingDataNotLoadedError
|
||||
from catalyst.exchange.exchange_utils import get_exchange_folder
|
||||
from catalyst.exchange.exchange_utils import get_exchange_folder, \
|
||||
get_exchange_bundles_folder
|
||||
from catalyst.utils.cli import maybe_show_progress
|
||||
from catalyst.utils.paths import ensure_directory
|
||||
|
||||
@@ -226,18 +227,18 @@ class ExchangeBundle:
|
||||
if reader is None:
|
||||
raise TempBundleNotFoundError(path=path)
|
||||
|
||||
arrays = None
|
||||
try:
|
||||
arrays = reader.load_raw_arrays(
|
||||
sids=[asset.sid],
|
||||
fields=['open', 'high', 'low', 'close', 'volume'],
|
||||
start_dt=start_dt,
|
||||
end_dt=end_dt
|
||||
)
|
||||
except Exception as e:
|
||||
log.warn('skipping ctable for {} from {} to {}: {}'.format(
|
||||
asset.symbol, start_dt, end_dt, e
|
||||
))
|
||||
# arrays = None
|
||||
# try:
|
||||
arrays = reader.load_raw_arrays(
|
||||
sids=[asset.sid],
|
||||
fields=['open', 'high', 'low', 'close', 'volume'],
|
||||
start_dt=start_dt,
|
||||
end_dt=end_dt
|
||||
)
|
||||
# except Exception as e:
|
||||
# log.warn('skipping ctable for {} from {} to {}: {}'.format(
|
||||
# asset.symbol, start_dt, end_dt, e
|
||||
# ))
|
||||
|
||||
if not arrays:
|
||||
return path
|
||||
@@ -438,7 +439,8 @@ class ExchangeBundle:
|
||||
start_dt=chunk['period_start'],
|
||||
end_dt=chunk['period_end'],
|
||||
writer=writer,
|
||||
empty_rows_behavior='strip'
|
||||
empty_rows_behavior='strip',
|
||||
cleanup=True
|
||||
)
|
||||
|
||||
def ingest(self, data_frequency, include_symbols=None,
|
||||
@@ -525,7 +527,7 @@ class ExchangeBundle:
|
||||
return values
|
||||
|
||||
except Exception:
|
||||
symbols = [asset.symbol.encode('utf-8') for asset in assets]
|
||||
symbols = [asset.symbol for asset in assets]
|
||||
raise PricingDataNotLoadedError(
|
||||
field=field,
|
||||
first_trading_day=min([asset.start_date for asset in assets]),
|
||||
@@ -552,7 +554,7 @@ class ExchangeBundle:
|
||||
reader = self.get_reader(data_frequency)
|
||||
|
||||
if reader is None:
|
||||
symbols = [asset.symbol.encode('utf-8') for asset in assets]
|
||||
symbols = [asset.symbol for asset in assets]
|
||||
raise PricingDataNotLoadedError(
|
||||
field=field,
|
||||
first_trading_day=min([asset.start_date for asset in assets]),
|
||||
@@ -610,3 +612,30 @@ class ExchangeBundle:
|
||||
series[asset] = value_series
|
||||
|
||||
return series
|
||||
|
||||
def clean(self, data_frequency):
|
||||
log.debug('cleaning exchange {}, frequency {}'.format(
|
||||
self.exchange.name, data_frequency
|
||||
))
|
||||
root = get_exchange_folder(self.exchange.name)
|
||||
|
||||
temp_bundles = os.path.join(root, 'temp_bundles')
|
||||
|
||||
if os.path.isdir(temp_bundles):
|
||||
log.debug('removing folder and content: {}'.format(temp_bundles))
|
||||
shutil.rmtree(temp_bundles)
|
||||
log.debug('{} removed'.format(temp_bundles))
|
||||
|
||||
frequencies = ['daily', 'minute'] if data_frequency is None \
|
||||
else [data_frequency]
|
||||
|
||||
for frequency in frequencies:
|
||||
label = '{}_bundle'.format(frequency)
|
||||
frequency_bundle = os.path.join(root, label)
|
||||
|
||||
if os.path.isdir(frequency_bundle):
|
||||
log.debug(
|
||||
'removing folder and content: {}'.format(frequency_bundle)
|
||||
)
|
||||
shutil.rmtree(frequency_bundle)
|
||||
log.debug('{} removed'.format(frequency_bundle))
|
||||
|
||||
Reference in New Issue
Block a user