mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-27 17:47:56 +08:00
ENH: when ingesting data of non-existing pair it is now throwing log warning.
This commit is contained in:
@@ -580,7 +580,7 @@ def ingest_exchange(ctx, exchange_name, data_frequency, start, end,
|
||||
|
||||
exchange_bundle = ExchangeBundle(exchange_name)
|
||||
|
||||
click.echo('Ingesting exchange bundle {}...'.format(exchange_name),
|
||||
click.echo('Trying to ingest exchange bundle {}...'.format(exchange_name),
|
||||
sys.stdout)
|
||||
exchange_bundle.ingest(
|
||||
data_frequency=data_frequency,
|
||||
|
||||
@@ -199,12 +199,8 @@ class Exchange:
|
||||
)
|
||||
assets.append(asset)
|
||||
|
||||
except SymbolNotFoundOnExchange:
|
||||
log.debug(
|
||||
'skipping non-existent market {} {}'.format(
|
||||
self.name, symbol
|
||||
)
|
||||
)
|
||||
except SymbolNotFoundOnExchange as e:
|
||||
log.warn(e)
|
||||
return assets
|
||||
|
||||
def get_asset(self, symbol, data_frequency=None, is_exchange_symbol=False,
|
||||
|
||||
@@ -22,7 +22,7 @@ from catalyst.exchange.exchange_errors import EmptyValuesInBundleError, \
|
||||
PricingDataNotLoadedError, DataCorruptionError, PricingDataValueError
|
||||
from catalyst.exchange.utils.bundle_utils import range_in_bundle, \
|
||||
get_bcolz_chunk, get_df_from_arrays, get_assets
|
||||
from catalyst.exchange.utils.datetime_utils import get_delta, get_start_dt, \
|
||||
from catalyst.exchange.utils.datetime_utils import get_start_dt, \
|
||||
get_period_label, get_month_start_end, get_year_start_end
|
||||
from catalyst.exchange.utils.exchange_utils import get_exchange_folder, \
|
||||
save_exchange_symbols, mixin_market_params, get_catalyst_symbol
|
||||
@@ -232,12 +232,12 @@ class ExchangeBundle:
|
||||
|
||||
problem = '{name} ({start_dt} to {end_dt}) has empty ' \
|
||||
'periods: {dates}'.format(
|
||||
name=asset.symbol,
|
||||
start_dt=asset.start_date.strftime(
|
||||
DATE_TIME_FORMAT),
|
||||
end_dt=end_dt.strftime(DATE_TIME_FORMAT),
|
||||
dates=[date.strftime(
|
||||
DATE_TIME_FORMAT) for date in dates])
|
||||
name=asset.symbol,
|
||||
start_dt=asset.start_date.strftime(
|
||||
DATE_TIME_FORMAT),
|
||||
end_dt=end_dt.strftime(DATE_TIME_FORMAT),
|
||||
dates=[date.strftime(
|
||||
DATE_TIME_FORMAT) for date in dates])
|
||||
|
||||
if empty_rows_behavior == 'warn':
|
||||
log.warn(problem)
|
||||
@@ -286,12 +286,12 @@ class ExchangeBundle:
|
||||
|
||||
problem = '{name} ({start_dt} to {end_dt}) has {threshold} ' \
|
||||
'identical close values on: {dates}'.format(
|
||||
name=asset.symbol,
|
||||
start_dt=asset.start_date.strftime(DATE_TIME_FORMAT),
|
||||
end_dt=end_dt.strftime(DATE_TIME_FORMAT),
|
||||
threshold=threshold,
|
||||
dates=[pd.to_datetime(date).strftime(DATE_TIME_FORMAT)
|
||||
for date in dates])
|
||||
name=asset.symbol,
|
||||
start_dt=asset.start_date.strftime(DATE_TIME_FORMAT),
|
||||
end_dt=end_dt.strftime(DATE_TIME_FORMAT),
|
||||
threshold=threshold,
|
||||
dates=[pd.to_datetime(date).strftime(DATE_TIME_FORMAT)
|
||||
for date in dates])
|
||||
|
||||
problems.append(problem)
|
||||
|
||||
@@ -458,7 +458,7 @@ class ExchangeBundle:
|
||||
last_entry = None
|
||||
|
||||
if start is None or \
|
||||
(earliest_trade is not None and earliest_trade > start):
|
||||
(earliest_trade is not None and earliest_trade > start):
|
||||
start = earliest_trade
|
||||
|
||||
if last_entry is not None and (end is None or end > last_entry):
|
||||
@@ -598,16 +598,41 @@ class ExchangeBundle:
|
||||
# we want to give an end_date far in time
|
||||
writer = self.get_writer(start_dt, end_dt, data_frequency)
|
||||
if show_breakdown:
|
||||
for asset in chunks:
|
||||
if chunks:
|
||||
for asset in chunks:
|
||||
with maybe_show_progress(
|
||||
chunks[asset],
|
||||
show_progress,
|
||||
label='Ingesting {frequency} price data for '
|
||||
'{symbol} on {exchange}'.format(
|
||||
exchange=self.exchange_name,
|
||||
frequency=data_frequency,
|
||||
symbol=asset.symbol
|
||||
)) as it:
|
||||
for chunk in it:
|
||||
problems += self.ingest_ctable(
|
||||
asset=chunk['asset'],
|
||||
data_frequency=data_frequency,
|
||||
period=chunk['period'],
|
||||
writer=writer,
|
||||
empty_rows_behavior='strip',
|
||||
cleanup=True
|
||||
)
|
||||
else:
|
||||
all_chunks = list(chain.from_iterable(itervalues(chunks)))
|
||||
# We sort the chunks by end date to ingest most recent data first
|
||||
if all_chunks:
|
||||
all_chunks.sort(
|
||||
key=lambda chunk: pd.to_datetime(chunk['period'])
|
||||
)
|
||||
with maybe_show_progress(
|
||||
chunks[asset],
|
||||
all_chunks,
|
||||
show_progress,
|
||||
label='Ingesting {frequency} price data for '
|
||||
'{symbol} on {exchange}'.format(
|
||||
label='Ingesting {frequency} price data on '
|
||||
'{exchange}'.format(
|
||||
exchange=self.exchange_name,
|
||||
frequency=data_frequency,
|
||||
symbol=asset.symbol
|
||||
)) as it:
|
||||
)) as it:
|
||||
for chunk in it:
|
||||
problems += self.ingest_ctable(
|
||||
asset=chunk['asset'],
|
||||
@@ -617,30 +642,6 @@ class ExchangeBundle:
|
||||
empty_rows_behavior='strip',
|
||||
cleanup=True
|
||||
)
|
||||
else:
|
||||
all_chunks = list(chain.from_iterable(itervalues(chunks)))
|
||||
|
||||
# We sort the chunks by end date to ingest most recent data first
|
||||
all_chunks.sort(
|
||||
key=lambda chunk: pd.to_datetime(chunk['period'])
|
||||
)
|
||||
with maybe_show_progress(
|
||||
all_chunks,
|
||||
show_progress,
|
||||
label='Ingesting {frequency} price data on '
|
||||
'{exchange}'.format(
|
||||
exchange=self.exchange_name,
|
||||
frequency=data_frequency,
|
||||
)) as it:
|
||||
for chunk in it:
|
||||
problems += self.ingest_ctable(
|
||||
asset=chunk['asset'],
|
||||
data_frequency=data_frequency,
|
||||
period=chunk['period'],
|
||||
writer=writer,
|
||||
empty_rows_behavior='strip',
|
||||
cleanup=True
|
||||
)
|
||||
|
||||
if show_report and len(problems) > 0:
|
||||
log.info('problems during ingestion:{}\n'.format(
|
||||
|
||||
Reference in New Issue
Block a user