diff --git a/catalyst/exchange/exchange_algorithm.py b/catalyst/exchange/exchange_algorithm.py index 3304fcc1..80f6a312 100644 --- a/catalyst/exchange/exchange_algorithm.py +++ b/catalyst/exchange/exchange_algorithm.py @@ -530,7 +530,7 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): self.add_exposure_stats(minute_stats) print_df = pd.DataFrame(list(self.minute_stats)) - log.debug( + log.info( 'statistics for the last {stats_minutes} minutes:\n{stats}'.format( stats_minutes=self.stats_minutes, stats=get_pretty_stats( diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index b04b676c..d85f417e 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -13,7 +13,8 @@ from catalyst.data.us_equity_pricing import BcolzDailyBarWriter, \ BcolzDailyBarReader from catalyst.exchange.bundle_utils import get_ffill_candles, range_in_bundle, \ get_bcolz_chunk, get_delta -from catalyst.exchange.exchange_errors import EmptyValuesInBundleError +from catalyst.exchange.exchange_errors import EmptyValuesInBundleError, \ + InvalidHistoryFrequencyError from catalyst.exchange.exchange_utils import get_exchange_folder from catalyst.utils.cli import maybe_show_progress from catalyst.utils.paths import ensure_directory @@ -106,8 +107,8 @@ class ExchangeBundle: except IOError: log.debug('no reader data found in {}'.format(input_dir)) else: - raise ValueError( - 'invalid frequency {}'.format(data_frequency) + raise InvalidHistoryFrequencyError( + frequency=data_frequency ) return self._readers[data_frequency] @@ -186,8 +187,8 @@ class ExchangeBundle: end_session=end_session ) else: - raise ValueError( - 'invalid frequency {}'.format(data_frequency) + raise InvalidHistoryFrequencyError( + frequency=data_frequency ) return self._writers[key] @@ -326,7 +327,7 @@ class ExchangeBundle: """ def ingest_ctable(self, asset, data_frequency, period, writer, - verify=False, cleanup=False): + empty_rows_behavior='warn', cleanup=False): """ Merge a ctable bundle chunk into the main bundle for the exchange. @@ -334,7 +335,7 @@ class ExchangeBundle: :param data_frequency: str :param period: str :param writer: - :param verify: bool + :param empty_rows_behavior: str Ensure that the bundle does not have any missing data. :param cleanup: bool @@ -350,22 +351,41 @@ class ExchangeBundle: period=period ) - reader = BcolzMinuteBarReader(path) - - start = reader.first_trading_day - - # end = reader.last_available_dt - end = reader.last_available_dt - - periods = self.calendar.minutes_in_range(start, end) - sid = asset.sid - arrays = reader.load_raw_arrays( - fields=['open', 'high', 'low', 'close', 'volume'], - start_dt=start, - end_dt=end, - sids=[sid] - ) + if data_frequency == 'minute': + reader = BcolzMinuteBarReader(path) + + start = reader.first_trading_day + end = reader.last_available_dt + + periods = self.calendar.minutes_in_range(start, end) + + arrays = reader.load_raw_arrays( + fields=['open', 'high', 'low', 'close', 'volume'], + start_dt=start, + end_dt=end, + sids=[sid] + ) + + elif data_frequency == 'daily': + reader = BcolzDailyBarReader(path) + + start = writer._start_session + end = writer._end_session + + periods = self.calendar.sessions_in_range(start, end) + + # Note that the parameters convention is totally different + # from the minute reader. + arrays = reader.load_raw_arrays( + columns=['open', 'high', 'low', 'close', 'volume'], + start_date=start, + end_date=end, + assets=[asset] + ) + + else: + raise InvalidHistoryFrequencyError(frequency=data_frequency) ohlcv = dict( open=arrays[0].flatten(), @@ -380,7 +400,7 @@ class ExchangeBundle: index=periods ) - if verify: + if empty_rows_behavior is not 'ignore': nan_rows = df[df.isnull().T.any().T].index if len(nan_rows) > 0: @@ -404,19 +424,22 @@ class ExchangeBundle: dates.append(pd.to_datetime(nan_rows.values[-1])) name = path.split('/')[-1] - log.warn( - '\n{name} with end minute {end_minute} has empty rows ' - 'in ranges: {dates}'.format( + if empty_rows_behavior == 'warn': + log.warn( + '\n{name} with end minute {end_minute} has empty rows ' + 'in ranges: {dates}'.format( + name=name, + end_minute=asset.end_minute, + dates=dates + ) + ) + + elif empty_rows_behavior == 'raise': + raise EmptyValuesInBundleError( name=name, end_minute=asset.end_minute, dates=dates ) - ) - raise EmptyValuesInBundleError( - name=name, - end_minute=asset.end_minute, - dates=dates - ) data = [] if not df.empty: @@ -458,34 +481,53 @@ class ExchangeBundle: periods = [] dt = sessions[0] while dt <= sessions[-1]: - period = '{}-{}'.format(dt.year, dt.month) + period = '{}-{}'.format(dt.year, dt.month) \ + if data_frequency == 'minute' else '{}'.format(dt.year) if period not in periods: periods.append(period) - month_range = calendar.monthrange(dt.year, dt.month) - month_start = pd.to_datetime( - datetime(dt.year, dt.month, 1, 0, 0, 0, 0), - utc=True) + if data_frequency == 'minute': + month_range = calendar.monthrange(dt.year, dt.month) + period_start = pd.to_datetime( + datetime(dt.year, dt.month, 1, 0, 0, 0, 0), + utc=True) - month_end = pd.to_datetime( - datetime( - dt.year, dt.month, month_range[1], 23, 59, 0, 0), - utc=True - ) + period_end = pd.to_datetime( + datetime( + dt.year, dt.month, month_range[1], 23, 59, 0, + 0), + utc=True + ) - if month_end > asset_end: - month_end = asset_end + elif data_frequency == 'daily': + period_start = pd.to_datetime( + datetime(dt.year, 1, 1, 0, 0, 0, 0), + utc=True) + + period_end = pd.to_datetime( + datetime( + dt.year, 12, 31, 23, 59, 0, 0), + utc=True + ) + else: + raise InvalidHistoryFrequencyError( + frequency=data_frequency + ) + + if period_end > asset_end: + period_end = asset_end has_data = \ - range_in_bundle(asset, month_start, month_end, reader) + range_in_bundle(asset, period_start, period_end, + reader) if not has_data: log.debug('adding period: {}'.format(period)) chunks.append( dict( asset=asset, - period_end=month_end, + period_end=period_end, period=period ) ) @@ -532,5 +574,6 @@ class ExchangeBundle: asset=chunk['asset'], data_frequency=data_frequency, period=chunk['period'], - writer=writer + writer=writer, + empty_rows_behavior='ignore' ) diff --git a/tests/exchange/test_bundle.py b/tests/exchange/test_bundle.py index 0dd86249..a0738dab 100644 --- a/tests/exchange/test_bundle.py +++ b/tests/exchange/test_bundle.py @@ -54,15 +54,15 @@ class ExchangeBundleTestCase: def test_ingest_daily(self): exchange_name = 'bitfinex' - start = pd.to_datetime('2017-09-01', utc=True) - end = pd.Timestamp.utcnow() + start = pd.to_datetime('2017-01-01', utc=True) + end = pd.to_datetime('2017-09-30', utc=True) exchange_bundle = ExchangeBundle(get_exchange(exchange_name)) log.info('ingesting exchange bundle {}'.format(exchange_name)) exchange_bundle.ingest( data_frequency='daily', - include_symbols='neo_btc', + include_symbols='neo_btc,bch_btc,eth_btc', exclude_symbols=None, start=start, end=end, @@ -71,19 +71,19 @@ class ExchangeBundleTestCase: pass def test_merge_ctables(self): - exchange_name = 'bitfinex' + exchange_name = 'poloniex' data_frequency = 'minute' exchange = get_exchange(exchange_name) - # asset = exchange.get_asset('gno_btc') + asset = exchange.get_asset('gno_btc') + + start = pd.to_datetime('2017-5-1', utc=True) + end = pd.to_datetime('2017-5-31', utc=True) + + # asset = exchange.get_asset('neo_btc') # - # start = pd.to_datetime('2017-5-1', utc=True) - # end = pd.to_datetime('2017-5-31', utc=True) - - asset = exchange.get_asset('neo_btc') - - start = pd.to_datetime('2017-9-1', utc=True) - end = pd.to_datetime('2017-9-30', utc=True) + # start = pd.to_datetime('2017-9-1', utc=True) + # end = pd.to_datetime('2017-9-30', utc=True) exchange_bundle = ExchangeBundle(exchange) @@ -91,9 +91,9 @@ class ExchangeBundleTestCase: exchange_bundle.ingest_ctable( asset=asset, data_frequency=data_frequency, - period='2017-9', + period='2017-5', writer=writer, - verify=True + empty_rows_behavior='raise' ) pass @@ -111,7 +111,7 @@ class ExchangeBundleTestCase: exchange_name=exchange_name, symbol=asset.symbol, data_frequency=data_frequency, - period='2017-5' + period='2017-5', ) reader = BcolzMinuteBarReader(path) pass