mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-02 02:40:51 +08:00
Optimize the existing data filter to filter by asset.
This commit is contained in:
@@ -146,9 +146,24 @@ class ExchangeBundle:
|
||||
|
||||
return self._writer
|
||||
|
||||
def check_data_exists(self, assets, start, end):
|
||||
has_data = True
|
||||
def filter_existing_assets(self, assets, start, end):
|
||||
"""
|
||||
For each asset, get the close on the start and end dates of the chunk.
|
||||
If the data exists, the chunk ingestion is complete.
|
||||
If any data is missing we ingest the data.
|
||||
|
||||
:param assets: list[TradingPair]
|
||||
The assets is scope.
|
||||
:param start:
|
||||
The chunk start date.
|
||||
:param end:
|
||||
The chunk end date.
|
||||
:return: list[TradingPair]
|
||||
The assets missing from the bundle
|
||||
"""
|
||||
missing_assets = []
|
||||
for asset in assets:
|
||||
has_data = True
|
||||
if has_data and self.reader is not None:
|
||||
try:
|
||||
start_close = self.reader.get_value(
|
||||
@@ -170,7 +185,10 @@ class ExchangeBundle:
|
||||
else:
|
||||
has_data = False
|
||||
|
||||
return has_data
|
||||
if not has_data:
|
||||
missing_assets.append(asset)
|
||||
|
||||
return missing_assets
|
||||
|
||||
def ingest(self):
|
||||
symbols = []
|
||||
@@ -233,21 +251,21 @@ class ExchangeBundle:
|
||||
if asset.start_date <= chunk_end:
|
||||
chunk_assets.append(asset)
|
||||
|
||||
if self.check_data_exists(
|
||||
chunk_assets, chunk_start, chunk_end):
|
||||
missing_assets = self.filter_existing_assets(
|
||||
chunk_assets, chunk_start, chunk_end)
|
||||
|
||||
if len(missing_assets) == 0:
|
||||
log.debug('the data chunk already exists')
|
||||
continue
|
||||
|
||||
# TODO: ensure correct behavior for assets starting in the chunk
|
||||
candles = fetch_candles_chunk(
|
||||
exchange=self.exchange,
|
||||
assets=chunk_assets,
|
||||
assets=missing_assets,
|
||||
data_frequency=frequency,
|
||||
end_dt=chunk_end,
|
||||
bar_count=chunk['bar_count']
|
||||
)
|
||||
log.debug(
|
||||
'requests counter {}'.format(self.exchange.request_cpt))
|
||||
|
||||
num_candles = 0
|
||||
data = []
|
||||
@@ -256,7 +274,7 @@ class ExchangeBundle:
|
||||
if not asset_candles:
|
||||
log.debug(
|
||||
'no data: {symbols} on {exchange}, date {end}'.format(
|
||||
symbols=chunk_assets,
|
||||
symbols=missing_assets,
|
||||
exchange=self.exchange.name,
|
||||
end=chunk_end
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user