From 9d7a35658b8971f4079c81595bf27324ee698656 Mon Sep 17 00:00:00 2001 From: EmbarAlmog Date: Tue, 20 Mar 2018 16:12:13 +0200 Subject: [PATCH] ENH: when ingesting data of non-existing pair it is now throwing log warning. --- catalyst/__main__.py | 2 +- catalyst/exchange/exchange.py | 8 +-- catalyst/exchange/exchange_bundle.py | 89 ++++++++++++++-------------- 3 files changed, 48 insertions(+), 51 deletions(-) diff --git a/catalyst/__main__.py b/catalyst/__main__.py index 39b5e277..1b02a6d0 100644 --- a/catalyst/__main__.py +++ b/catalyst/__main__.py @@ -580,7 +580,7 @@ def ingest_exchange(ctx, exchange_name, data_frequency, start, end, exchange_bundle = ExchangeBundle(exchange_name) - click.echo('Ingesting exchange bundle {}...'.format(exchange_name), + click.echo('Trying to ingest exchange bundle {}...'.format(exchange_name), sys.stdout) exchange_bundle.ingest( data_frequency=data_frequency, diff --git a/catalyst/exchange/exchange.py b/catalyst/exchange/exchange.py index e5bea0fe..40238b90 100644 --- a/catalyst/exchange/exchange.py +++ b/catalyst/exchange/exchange.py @@ -199,12 +199,8 @@ class Exchange: ) assets.append(asset) - except SymbolNotFoundOnExchange: - log.debug( - 'skipping non-existent market {} {}'.format( - self.name, symbol - ) - ) + except SymbolNotFoundOnExchange as e: + log.warn(e) return assets def get_asset(self, symbol, data_frequency=None, is_exchange_symbol=False, diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index 569fa6e5..a9138b23 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -22,7 +22,7 @@ from catalyst.exchange.exchange_errors import EmptyValuesInBundleError, \ PricingDataNotLoadedError, DataCorruptionError, PricingDataValueError from catalyst.exchange.utils.bundle_utils import range_in_bundle, \ get_bcolz_chunk, get_df_from_arrays, get_assets -from catalyst.exchange.utils.datetime_utils import get_delta, get_start_dt, \ +from catalyst.exchange.utils.datetime_utils import get_start_dt, \ get_period_label, get_month_start_end, get_year_start_end from catalyst.exchange.utils.exchange_utils import get_exchange_folder, \ save_exchange_symbols, mixin_market_params, get_catalyst_symbol @@ -232,12 +232,12 @@ class ExchangeBundle: problem = '{name} ({start_dt} to {end_dt}) has empty ' \ 'periods: {dates}'.format( - name=asset.symbol, - start_dt=asset.start_date.strftime( - DATE_TIME_FORMAT), - end_dt=end_dt.strftime(DATE_TIME_FORMAT), - dates=[date.strftime( - DATE_TIME_FORMAT) for date in dates]) + name=asset.symbol, + start_dt=asset.start_date.strftime( + DATE_TIME_FORMAT), + end_dt=end_dt.strftime(DATE_TIME_FORMAT), + dates=[date.strftime( + DATE_TIME_FORMAT) for date in dates]) if empty_rows_behavior == 'warn': log.warn(problem) @@ -286,12 +286,12 @@ class ExchangeBundle: problem = '{name} ({start_dt} to {end_dt}) has {threshold} ' \ 'identical close values on: {dates}'.format( - name=asset.symbol, - start_dt=asset.start_date.strftime(DATE_TIME_FORMAT), - end_dt=end_dt.strftime(DATE_TIME_FORMAT), - threshold=threshold, - dates=[pd.to_datetime(date).strftime(DATE_TIME_FORMAT) - for date in dates]) + name=asset.symbol, + start_dt=asset.start_date.strftime(DATE_TIME_FORMAT), + end_dt=end_dt.strftime(DATE_TIME_FORMAT), + threshold=threshold, + dates=[pd.to_datetime(date).strftime(DATE_TIME_FORMAT) + for date in dates]) problems.append(problem) @@ -458,7 +458,7 @@ class ExchangeBundle: last_entry = None if start is None or \ - (earliest_trade is not None and earliest_trade > start): + (earliest_trade is not None and earliest_trade > start): start = earliest_trade if last_entry is not None and (end is None or end > last_entry): @@ -598,16 +598,41 @@ class ExchangeBundle: # we want to give an end_date far in time writer = self.get_writer(start_dt, end_dt, data_frequency) if show_breakdown: - for asset in chunks: + if chunks: + for asset in chunks: + with maybe_show_progress( + chunks[asset], + show_progress, + label='Ingesting {frequency} price data for ' + '{symbol} on {exchange}'.format( + exchange=self.exchange_name, + frequency=data_frequency, + symbol=asset.symbol + )) as it: + for chunk in it: + problems += self.ingest_ctable( + asset=chunk['asset'], + data_frequency=data_frequency, + period=chunk['period'], + writer=writer, + empty_rows_behavior='strip', + cleanup=True + ) + else: + all_chunks = list(chain.from_iterable(itervalues(chunks))) + # We sort the chunks by end date to ingest most recent data first + if all_chunks: + all_chunks.sort( + key=lambda chunk: pd.to_datetime(chunk['period']) + ) with maybe_show_progress( - chunks[asset], + all_chunks, show_progress, - label='Ingesting {frequency} price data for ' - '{symbol} on {exchange}'.format( + label='Ingesting {frequency} price data on ' + '{exchange}'.format( exchange=self.exchange_name, frequency=data_frequency, - symbol=asset.symbol - )) as it: + )) as it: for chunk in it: problems += self.ingest_ctable( asset=chunk['asset'], @@ -617,30 +642,6 @@ class ExchangeBundle: empty_rows_behavior='strip', cleanup=True ) - else: - all_chunks = list(chain.from_iterable(itervalues(chunks))) - - # We sort the chunks by end date to ingest most recent data first - all_chunks.sort( - key=lambda chunk: pd.to_datetime(chunk['period']) - ) - with maybe_show_progress( - all_chunks, - show_progress, - label='Ingesting {frequency} price data on ' - '{exchange}'.format( - exchange=self.exchange_name, - frequency=data_frequency, - )) as it: - for chunk in it: - problems += self.ingest_ctable( - asset=chunk['asset'], - data_frequency=data_frequency, - period=chunk['period'], - writer=writer, - empty_rows_behavior='strip', - cleanup=True - ) if show_report and len(problems) > 0: log.info('problems during ingestion:{}\n'.format(