From bdbaad1c913f132e57df5270ecb4984072ddead2 Mon Sep 17 00:00:00 2001 From: fredfortier Date: Sat, 14 Oct 2017 02:06:26 -0400 Subject: [PATCH] Improvements and fixes to the ingestion component --- catalyst/examples/simple_loop.py | 6 ++-- catalyst/exchange/bundle_utils.py | 12 ++++++- catalyst/exchange/exchange_bundle.py | 52 ++++++++++++++++++++++------ tests/exchange/test_bundle.py | 5 +-- 4 files changed, 58 insertions(+), 17 deletions(-) diff --git a/catalyst/examples/simple_loop.py b/catalyst/examples/simple_loop.py index 63501037..50301281 100644 --- a/catalyst/examples/simple_loop.py +++ b/catalyst/examples/simple_loop.py @@ -6,7 +6,7 @@ from catalyst.api import symbol def initialize(context): print('initializing') - context.asset = symbol('btc_usdt') + context.asset = symbol('gno_btc') def handle_data(context, data): @@ -18,8 +18,8 @@ def handle_data(context, data): run_algorithm( capital_base=250, - start=pd.to_datetime('2017-1-1', utc=True), - end=pd.to_datetime('2017-1-31', utc=True), + start=pd.to_datetime('2017-5-1', utc=True), + end=pd.to_datetime('2017-5-31', utc=True), data_frequency='minute', initialize=initialize, handle_data=handle_data, diff --git a/catalyst/exchange/bundle_utils.py b/catalyst/exchange/bundle_utils.py index 5a92669d..14a0870c 100644 --- a/catalyst/exchange/bundle_utils.py +++ b/catalyst/exchange/bundle_utils.py @@ -1,5 +1,5 @@ -import gzip import tarfile +import shutil import requests from datetime import timedelta, datetime @@ -221,6 +221,16 @@ def get_ffill_candles(candles, bar_count, end_dt, data_frequency, def range_in_bundle(asset, start_dt, end_dt, reader): + """ + Evaluate whether price data of an asset is included has been ingested in + the exchange bundle for the given date range. + + :param asset: + :param start_dt: + :param end_dt: + :param reader: + :return: + """ has_data = True if has_data and reader is not None: try: diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index bec8a1ca..e6163580 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -1,5 +1,7 @@ import calendar import os +import shutil + import pytz from datetime import timedelta, datetime @@ -50,17 +52,20 @@ class ExchangeBundle: else: return self.exchange.get_assets() - def get_adj_dates(self, start, end, assets): - now = pd.Timestamp.utcnow() - if end is None or end > now: - log.debug('adjusting the end date to now {}'.format(now)) - end = now + def get_adj_dates(self, start, end, assets, data_frequency): earliest_trade = None + last_entry = None for asset in assets: if earliest_trade is None or earliest_trade > asset.start_date: earliest_trade = asset.start_date + end_asset = asset.end_minute if data_frequency == 'minute' else \ + asset.end_daily + if end_asset is not None and \ + (last_entry is None or end_asset > last_entry): + last_entry = end_asset + if start is None or earliest_trade > start: log.debug( 'adjusting start date to earliest trade date found {}'.format( @@ -68,6 +73,10 @@ class ExchangeBundle: )) start = earliest_trade + if end is None or (last_entry is not None and end > last_entry): + log.debug('adjusting the end date to now {}'.format(last_entry)) + end = last_entry + if start >= end: raise ValueError('start date cannot be after end date') @@ -328,7 +337,7 @@ class ExchangeBundle: """ def ingest_ctable(self, asset, data_frequency, period, writer, - verify=False): + verify=False, cleanup=False): """ Merge a ctable bundle chunk into the main bundle for the exchange. @@ -336,7 +345,12 @@ class ExchangeBundle: :param data_frequency: str :param period: str :param writer: - :param verify: + :param verify: bool + Ensure that the bundle does not have any missing data. + + :param cleanup: bool + Remove the temp bundle directory after ingestion. + :return: """ @@ -390,10 +404,15 @@ class ExchangeBundle: data = [] if not df.empty: df.sort_index(inplace=True) - data.append((sid, df)) self._write(data, writer, data_frequency) + + if cleanup: + log.debug('removing bundle folder following ' + 'ingestion: {}'.format(path)) + shutil.rmtree(path) + return path def ingest(self, data_frequency, include_symbols=None, @@ -412,15 +431,22 @@ class ExchangeBundle: """ assets = self.get_assets(include_symbols, exclude_symbols) - start, end = self.get_adj_dates(start, end, assets) + start, end = self.get_adj_dates(start, end, assets, data_frequency) reader = self.get_reader(data_frequency) chunks = [] - periods = [] for asset in assets: - asset_start, asset_end = self.get_adj_dates(start, end, [asset]) + try: + asset_start, asset_end = \ + self.get_adj_dates(start, end, [asset], data_frequency) + + except ValueError: + dt += timedelta(days=1) + continue + sessions = self.calendar.sessions_in_range(asset_start, asset_end) + periods = [] dt = sessions[0] while dt <= sessions[-1]: period = '{}-{}'.format(dt.year, dt.month) @@ -438,6 +464,10 @@ class ExchangeBundle: datetime(dt.year, dt.month, month_range[1] - 1, 23, 59, 0, 0), utc=True) + + if month_end > asset_end: + month_end = asset_end + has_data = \ range_in_bundle(asset, month_start, month_end, reader) diff --git a/tests/exchange/test_bundle.py b/tests/exchange/test_bundle.py index 29b843aa..f2158e87 100644 --- a/tests/exchange/test_bundle.py +++ b/tests/exchange/test_bundle.py @@ -21,7 +21,8 @@ class ExchangeBundleTestCase: log.info('ingesting exchange bundle {}'.format(exchange_name)) exchange_bundle.ingest( data_frequency='minute', - include_symbols='btc_usdt', + include_symbols='gno_btc', + # include_symbols=None, exclude_symbols=None, start=start, end=end, @@ -72,7 +73,7 @@ class ExchangeBundleTestCase: data_frequency = 'minute' exchange = get_exchange(exchange_name) - asset = exchange.get_asset('btc_usdt') + asset = exchange.get_asset('gno_btc') start = pd.to_datetime('2017-5-1', utc=True) end = pd.to_datetime('2017-5-31', utc=True)