From c52653c84e48c66fc5dca42fdc15c7dc7dc3232f Mon Sep 17 00:00:00 2001 From: fredfortier Date: Fri, 13 Oct 2017 21:00:47 -0400 Subject: [PATCH] Tested ingestion of minute data with a single market --- catalyst/examples/simple_loop.py | 30 ++++++++++++++ catalyst/exchange/bundle_utils.py | 2 +- catalyst/exchange/exchange_bundle.py | 61 ++++++++++++++++++++-------- catalyst/exchange/exchange_errors.py | 13 ++++-- tests/exchange/test_bundle.py | 20 ++++----- 5 files changed, 93 insertions(+), 33 deletions(-) create mode 100644 catalyst/examples/simple_loop.py diff --git a/catalyst/examples/simple_loop.py b/catalyst/examples/simple_loop.py new file mode 100644 index 00000000..63501037 --- /dev/null +++ b/catalyst/examples/simple_loop.py @@ -0,0 +1,30 @@ +import pandas as pd + +from catalyst import run_algorithm +from catalyst.api import symbol + + +def initialize(context): + print('initializing') + context.asset = symbol('btc_usdt') + + +def handle_data(context, data): + print('handling bar: {}'.format(data.current_dt)) + + price = data.current(context.asset, 'close') + print('got price {price}'.format(price=price)) + + +run_algorithm( + capital_base=250, + start=pd.to_datetime('2017-1-1', utc=True), + end=pd.to_datetime('2017-1-31', utc=True), + data_frequency='minute', + initialize=initialize, + handle_data=handle_data, + analyze=None, + exchange_name='poloniex', + algo_namespace='simple_loop', + base_currency='btc' +) diff --git a/catalyst/exchange/bundle_utils.py b/catalyst/exchange/bundle_utils.py index 92af9b10..5a92669d 100644 --- a/catalyst/exchange/bundle_utils.py +++ b/catalyst/exchange/bundle_utils.py @@ -236,7 +236,7 @@ def range_in_bundle(asset, start_dt, end_dt, reader): if np.isnan(end_close): has_data = False - except Exception: + except Exception as e: has_data = False else: diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index b45f965e..bec8a1ca 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -1,9 +1,10 @@ import calendar import os -import time -from datetime import timedelta, datetime, date +import pytz +from datetime import timedelta, datetime import pandas as pd +import numpy as np from logbook import Logger, INFO from catalyst import get_calendar @@ -13,6 +14,7 @@ from catalyst.data.us_equity_pricing import BcolzDailyBarWriter, \ BcolzDailyBarReader from catalyst.exchange.bundle_utils import get_ffill_candles, get_start_dt, \ get_periods, range_in_bundle, get_bcolz_chunk +from catalyst.exchange.exchange_errors import EmptyValuesInBundleError from catalyst.exchange.exchange_utils import get_exchange_folder, \ get_exchange_bundles_folder from catalyst.utils.cli import maybe_show_progress @@ -325,8 +327,18 @@ class ExchangeBundle: :return: """ - def ingest_ctable(self, asset, data_frequency, period, writer): - start_time = time.time() + def ingest_ctable(self, asset, data_frequency, period, writer, + verify=False): + """ + Merge a ctable bundle chunk into the main bundle for the exchange. + + :param asset: TradingPair + :param data_frequency: str + :param period: str + :param writer: + :param verify: + :return: + """ path = get_bcolz_chunk( exchange_name=self.exchange.name, @@ -338,7 +350,10 @@ class ExchangeBundle: reader = BcolzMinuteBarReader(path) start = reader.first_trading_day - end = reader.last_available_dt + + # TODO: temp workaround, remove when the bundles are fixed + # end = reader.last_available_dt + end = reader.last_available_dt - timedelta(days=1) periods = self.calendar.minutes_in_range(start, end) @@ -363,20 +378,23 @@ class ExchangeBundle: index=periods ) + if verify: + nan_rows = df[df.isnull().T.any().T].index + if len(nan_rows) > 0: + raise EmptyValuesInBundleError( + path=path, + start=nan_rows[0], + end=nan_rows[-1] + ) + data = [] if not df.empty: - df.sort_index(inplace=True, ascending=False) + df.sort_index(inplace=True) data.append((sid, df)) self._write(data, writer, data_frequency) - - end_time = time.time() - delta_time = end_time - start_time - - log.info('time elapsed {}'.format(delta_time)) - - pass + return path def ingest(self, data_frequency, include_symbols=None, exclude_symbols=None, start=None, end=None, @@ -411,11 +429,19 @@ class ExchangeBundle: periods.append(period) month_range = calendar.monthrange(dt.year, dt.month) - month_start = date(dt.year, dt.month, month_range[0]) - month_end = date(dt.year, dt.month, month_range[1]) + month_start = pd.to_datetime( + datetime(dt.year, dt.month, 1, 0, 0, 0, 0), + utc=True) - if not range_in_bundle(asset, month_start, month_end, - reader): + # TODO: workaround, remove when bundles are fixed + month_end = pd.to_datetime( + datetime(dt.year, dt.month, month_range[1] - 1, + 23, 59, 0, 0), + utc=True) + has_data = \ + range_in_bundle(asset, month_start, month_end, reader) + + if not has_data: log.debug('adding period: {}'.format(period)) chunks.append( dict( @@ -445,4 +471,3 @@ class ExchangeBundle: period=chunk['period'], writer=writer ) - pass diff --git a/catalyst/exchange/exchange_errors.py b/catalyst/exchange/exchange_errors.py index fc54ec9b..aaed1b3a 100644 --- a/catalyst/exchange/exchange_errors.py +++ b/catalyst/exchange/exchange_errors.py @@ -1,16 +1,18 @@ import sys, traceback from catalyst.errors import ZiplineError + def silent_except_hook(exctype, excvalue, exctraceback): - if exctype in [ PricingDataBeforeTradingError, PricingDataNotLoadedError, - SymbolNotFoundOnExchange, ]: + if exctype in [PricingDataBeforeTradingError, PricingDataNotLoadedError, + SymbolNotFoundOnExchange, ]: fn = traceback.extract_tb(exctraceback)[-1][0] ln = traceback.extract_tb(exctraceback)[-1][1] print "Error traceback: {1} (line {2})\n" \ - "{0.__name__}: {3}".format(exctype, fn, ln, excvalue) + "{0.__name__}: {3}".format(exctype, fn, ln, excvalue) else: sys.__excepthook__(exctype, excvalue, exctraceback) + sys.excepthook = silent_except_hook @@ -168,6 +170,11 @@ class BundleNotFoundError(ZiplineError): 'See catalyst documentation for details.').strip() +class EmptyValuesInBundleError(ZiplineError): + msg = ('Found empty values in bundle {path} between ' + '{start} and {end}.').strip() + + class PricingDataBeforeTradingError(ZiplineError): msg = ('Pricing data for trading pairs {symbols} on exchange {exchange} ' 'starts on {first_trading_day}, but you are either trying to trade or ' diff --git a/tests/exchange/test_bundle.py b/tests/exchange/test_bundle.py index 8f4e5a8f..29b843aa 100644 --- a/tests/exchange/test_bundle.py +++ b/tests/exchange/test_bundle.py @@ -1,8 +1,5 @@ -from datetime import timedelta, time from logging import Logger -import bcolz -from toolz.itertoolz import join as joinz import pandas as pd from catalyst.exchange.exchange_bundle import ExchangeBundle @@ -13,18 +10,18 @@ log = Logger('test_exchange_bundle') class ExchangeBundleTestCase: def test_ingest_minute(self): - exchange_name = 'bitfinex' + exchange_name = 'poloniex' # start = pd.to_datetime('2017-09-01', utc=True) - start = pd.to_datetime('2017-10-01', utc=True) - end = pd.to_datetime('2017-10-06', utc=True) + start = pd.to_datetime('2017-1-1', utc=True) + end = pd.to_datetime('2017-6-30', utc=True) exchange_bundle = ExchangeBundle(get_exchange(exchange_name)) log.info('ingesting exchange bundle {}'.format(exchange_name)) exchange_bundle.ingest( data_frequency='minute', - include_symbols='bcc_btc', + include_symbols='btc_usdt', exclude_symbols=None, start=start, end=end, @@ -77,8 +74,8 @@ class ExchangeBundleTestCase: exchange = get_exchange(exchange_name) asset = exchange.get_asset('btc_usdt') - start = pd.to_datetime('2017-09-01', utc=True) - end = pd.to_datetime('2017-09-06', utc=True) + start = pd.to_datetime('2017-5-1', utc=True) + end = pd.to_datetime('2017-5-31', utc=True) exchange_bundle = ExchangeBundle(exchange) @@ -86,7 +83,8 @@ class ExchangeBundleTestCase: exchange_bundle.ingest_ctable( asset=asset, data_frequency=data_frequency, - period='2017-9', - writer=writer + period='2017-5', + writer=writer, + verify=True ) pass