diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index 8dad4123..890a7bbd 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -146,9 +146,24 @@ class ExchangeBundle: return self._writer - def check_data_exists(self, assets, start, end): - has_data = True + def filter_existing_assets(self, assets, start, end): + """ + For each asset, get the close on the start and end dates of the chunk. + If the data exists, the chunk ingestion is complete. + If any data is missing we ingest the data. + + :param assets: list[TradingPair] + The assets is scope. + :param start: + The chunk start date. + :param end: + The chunk end date. + :return: list[TradingPair] + The assets missing from the bundle + """ + missing_assets = [] for asset in assets: + has_data = True if has_data and self.reader is not None: try: start_close = self.reader.get_value( @@ -170,7 +185,10 @@ class ExchangeBundle: else: has_data = False - return has_data + if not has_data: + missing_assets.append(asset) + + return missing_assets def ingest(self): symbols = [] @@ -233,21 +251,21 @@ class ExchangeBundle: if asset.start_date <= chunk_end: chunk_assets.append(asset) - if self.check_data_exists( - chunk_assets, chunk_start, chunk_end): + missing_assets = self.filter_existing_assets( + chunk_assets, chunk_start, chunk_end) + + if len(missing_assets) == 0: log.debug('the data chunk already exists') continue # TODO: ensure correct behavior for assets starting in the chunk candles = fetch_candles_chunk( exchange=self.exchange, - assets=chunk_assets, + assets=missing_assets, data_frequency=frequency, end_dt=chunk_end, bar_count=chunk['bar_count'] ) - log.debug( - 'requests counter {}'.format(self.exchange.request_cpt)) num_candles = 0 data = [] @@ -256,7 +274,7 @@ class ExchangeBundle: if not asset_candles: log.debug( 'no data: {symbols} on {exchange}, date {end}'.format( - symbols=chunk_assets, + symbols=missing_assets, exchange=self.exchange.name, end=chunk_end )