From c658d15fcbd806ca1863af642dadb147f0a91248 Mon Sep 17 00:00:00 2001 From: fredfortier Date: Fri, 13 Oct 2017 00:50:25 -0400 Subject: [PATCH] Unit testing ingestion of bundles logic --- .../buy_low_sell_high_neo_with_interface.py | 14 +- catalyst/exchange/exchange_bundle.py | 124 +++++++++++++----- tests/exchange/test_bundle.py | 32 ++++- 3 files changed, 129 insertions(+), 41 deletions(-) diff --git a/catalyst/examples/buy_low_sell_high_neo_with_interface.py b/catalyst/examples/buy_low_sell_high_neo_with_interface.py index b70e451a..7037afad 100644 --- a/catalyst/examples/buy_low_sell_high_neo_with_interface.py +++ b/catalyst/examples/buy_low_sell_high_neo_with_interface.py @@ -33,6 +33,13 @@ def initialize(context): def _handle_data(context, data): + price = data.current(context.asset, 'close') + log.info('got price {price}'.format(price=price)) + + if price is None: + log.warn('no pricing data') + return + prices = data.history( context.asset, fields='price', @@ -55,13 +62,6 @@ def _handle_data(context, data): cash = context.portfolio.cash log.info('base currency available: {cash}'.format(cash=cash)) - price = data.current(context.asset, 'close') - log.info('got price {price}'.format(price=price)) - - if price is None: - log.warn('no pricing data') - return - record(price=price) orders = get_open_orders(context.asset) diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index 15022384..a843a777 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -1,6 +1,8 @@ import os +import time from datetime import timedelta +import bcolz import pandas as pd from logbook import Logger, DEBUG, INFO @@ -209,7 +211,38 @@ class ExchangeBundle: return missing_assets - @deprecated + def _write(self, data, writer, data_frequency): + """ + Write data to the writer + + :param df: + :param writer: + :return: + """ + try: + writer.write( + data=data, + show_progress=False, + invalid_data_behavior='raise' + ) + except BcolzMinuteOverlappingData as e: + log.warn('chunk already exists: {}'.format(e)) + except Exception as e: + log.warn('error when writing data: {}, trying again'.format(e)) + + # This is workaround, there is an issue with empty + # session_label when using a newly created writer + del self._writers[data_frequency] + + # TODO: these are the dates of the chunk, not the job + writer = self.get_writer(writer._start_session, + writer._end_session, data_frequency) + writer.write( + data=data, + show_progress=False, + invalid_data_behavior='raise' + ) + def ingest_chunk(self, bar_count, end_dt, data_frequency, asset, writer, previous_candle=dict()): """ @@ -264,6 +297,7 @@ class ExchangeBundle: index=all_dates, columns=['open', 'high', 'low', 'close', 'volume'] ) + if not df.empty: df.sort_index(inplace=True) @@ -272,41 +306,67 @@ class ExchangeBundle: data.append((sid, df)) - try: - log.debug( - 'writing {num_candles} candles for {bar_count} bars' - 'ending {end}'.format( - num_candles=num_candles, - bar_count=bar_count, - end=end_dt - ) - ) - - writer.write( - data=data, - show_progress=False, - invalid_data_behavior='raise' - ) - except BcolzMinuteOverlappingData as e: - log.warn('chunk already exists: {}'.format(e)) - except Exception as e: - log.warn('error when writing data: {}, trying again'.format(e)) - - # This is workaround, there is an issue with empty - # session_label when using a newly created writer - del self._writers[data_frequency] - - # TODO: these are the dates of the chunk, not the job - start_dt = get_start_dt(end_dt, bar_count, data_frequency) - writer = self.get_writer(start_dt, end_dt, data_frequency) - writer.write( - data=data, - show_progress=False, - invalid_data_behavior='raise' + log.debug( + 'writing {num_candles} candles for {bar_count} bars' + 'ending {end}'.format( + num_candles=num_candles, + bar_count=bar_count, + end=end_dt ) + ) + self._write(data, writer, data_frequency) return data + def ingest_ctable(self, asset, data_frequency, path): + start_time = time.time() + + reader = BcolzMinuteBarReader(path) + + start = reader.first_trading_day + end = reader.last_available_dt + + open_calendar = get_calendar('OPEN') + periods = open_calendar.minutes_in_range(start, end) + + sid = 284 + arrays = reader.load_raw_arrays( + fields=['open', 'high', 'low', 'close', 'volume'], + start_dt=start, + end_dt=end, + sids=[sid] + ) + + ohlcv = dict( + open=arrays[0].flatten(), + high=arrays[1].flatten(), + low=arrays[2].flatten(), + close=arrays[3].flatten(), + volume=arrays[4].flatten() + ) + + df = pd.DataFrame( + data=ohlcv, + index=periods + ) + + data = [] + if not df.empty: + df.sort_index(inplace=True) + + data.append((sid, df)) + + writer = self.get_writer(start, end, data_frequency) + + self._write(data, writer, data_frequency) + + end_time = time.time() + delta_time = end_time - start_time + + log.info('time elapsed {}'.format(delta_time)) + + pass + def ingest(self, data_frequency, include_symbols=None, exclude_symbols=None, start=None, end=None, show_progress=True, environ=os.environ): diff --git a/tests/exchange/test_bundle.py b/tests/exchange/test_bundle.py index 47ebf30d..78e7fd31 100644 --- a/tests/exchange/test_bundle.py +++ b/tests/exchange/test_bundle.py @@ -1,6 +1,8 @@ -from datetime import timedelta +from datetime import timedelta, time from logging import Logger +import bcolz +from toolz.itertoolz import join as joinz import pandas as pd from catalyst.exchange.exchange_bundle import ExchangeBundle @@ -22,7 +24,7 @@ class ExchangeBundleTestCase: log.info('ingesting exchange bundle {}'.format(exchange_name)) exchange_bundle.ingest( data_frequency='minute', - include_symbols='neo_btc', + include_symbols='bcc_btc', exclude_symbols=None, start=start, end=end, @@ -67,3 +69,29 @@ class ExchangeBundleTestCase: show_progress=True ) pass + + def test_merge_ctables(self): + exchange_name = 'bitfinex' + + root = '/Users/fredfortier/.catalyst/data/exchanges/bitfinex/temp_bundles' + path = '00/02/000284.bcolz' + + august = '{}/{}'.format( + root, 'poloniex-minute-btc_usdt-2017-8' + ) + exchange = get_exchange(exchange_name) + asset = exchange.get_asset('btc_usd') + + exchange_bundle = ExchangeBundle(exchange) + exchange_bundle.ingest_ctable( + asset=asset, + data_frequency='minute', + path=august + ) + + september = '{}/{}/{}'.format( + root, 'poloniex-minute-btc_usdt-2017-9', path + ) + zseptember = bcolz.open(september, mode='a') + + pass