diff --git a/catalyst/curate/__init__.py b/catalyst/curate/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/curate/crypto_price_generator.py b/catalyst/curate/poloniex.py similarity index 74% rename from curate/crypto_price_generator.py rename to catalyst/curate/poloniex.py index 1e889952..7d96adbe 100644 --- a/curate/crypto_price_generator.py +++ b/catalyst/curate/poloniex.py @@ -6,17 +6,14 @@ import time import requests import logbook -import catalyst.data.bundles.core as bundles - DT_START = time.mktime(datetime(2010, 01, 01, 0, 0).timetuple()) -# DT_START = time.mktime(datetime(2017, 06, 13, 0, 0).timetuple()) # TODO: remove temp CSV_OUT_FOLDER = '/var/tmp/catalyst/data/poloniex/' CONN_RETRIES = 2 logbook.StderrHandler().push_application() log = logbook.Logger(__name__) -class PoloniexDataGenerator(object): +class PoloniexCurator(object): """ OHLCV data feed generator for crypto data. Based on Poloniex market data """ @@ -89,7 +86,8 @@ class PoloniexDataGenerator(object): log.debug('Getting data for %s' % currencyPair) csv_fn = CSV_OUT_FOLDER + 'crypto_prices-' + currencyPair + '.csv' start = self._get_start_date(csv_fn) - if (time.time() > start): # Only fetch data if more than 5min have passed since last fetch + # Only fetch data if more than 5min have passed since last fetch + if (time.time() > start): data = self.get_data(currencyPair, start) if data is not None: try: @@ -98,7 +96,14 @@ class PoloniexDataGenerator(object): for item in data: if item['date'] == 0: continue - csvwriter.writerow([item['date'], item['open'], item['high'], item['low'], item['close'], item['volume']]) + csvwriter.writerow([ + item['date'], + item['open'], + item['high'], + item['low'], + item['close'], + item['volume'], + ]) except Exception as e: log.error('Error opening %s' % csv_fn) log.exception(e) @@ -112,7 +117,8 @@ class PoloniexDataGenerator(object): def append_data(self): for currencyPair in self.currency_pairs: self.append_data_single_pair(currencyPair) - time.sleep(0.17) # Rate limit is 6 calls per second, sleep 1sec/6 to be safe + # Rate limit is 6 calls per second, sleep 1sec/6 to be safe + time.sleep(0.17) ''' Returns a data frame for all pairs, or for the requests currency pair. @@ -130,57 +136,9 @@ class PoloniexDataGenerator(object): df['date']=pd.to_datetime(df['date'],unit='s') df.set_index('date', inplace=True) - #return df.loc[(df.index > start) & (df.index <= end)] return df[datetime.fromtimestamp(start):datetime.fromtimestamp(end-1)] if __name__ == '__main__': - pdg = PoloniexDataGenerator() - pdg.get_currency_pairs() - pdg.append_data() - - - -# from zipline.utils.calendars import get_calendar -# from zipline.data.us_equity_pricing import ( -# BcolzDailyBarWriter, -# BcolzDailyBarReader, -# ) - -# open_calendar = get_calendar('OPEN') - -# start_session = pd.Timestamp('2012-12-31', tz='UTC') -# end_session = pd.Timestamp('2015-01-01', tz='UTC') - -# file_path = 'test.bcolz' - -# writer = BcolzDailyBarWriter( -# file_path, -# open_calendar, -# start_session, -# end_session -# ) - -# index = open_calendar.schedule.index -# index = index[ -# (index.date >= start_session.date()) & -# (index.date <= end_session.date()) -# ] - -# data = pd.DataFrame( -# 0, -# index=index, -# columns=['open', 'high', 'low', 'close', 'volume'], -# ) - -# writer.write( -# [(0, data)], -# assets=[0], -# show_progress=True -# ) - -# print 'len(index):', len(index) - -# reader = BcolzDailyBarReader(file_path) - -# print 'first_rows:', reader._first_rows -# print 'last_rows:', reader._last_rows + pc = PoloniexCurator() + pc.get_currency_pairs() + pc.append_data() diff --git a/catalyst/data/loader.py b/catalyst/data/loader.py index 15acc567..c845752f 100644 --- a/catalyst/data/loader.py +++ b/catalyst/data/loader.py @@ -31,6 +31,8 @@ from ..utils.paths import ( data_root, ) from ..utils.deprecate import deprecated + +from catalyst.curate.poloniex import PoloniexCurator from catalyst.utils.calendars import get_calendar @@ -310,6 +312,8 @@ def ensure_crypto_benchmark_data(symbol, first_date, last_date, now, return daily_bars + + five_min_bars = None try: # load five minute bars from csv cache five_min_bars = pd.read_csv( @@ -320,31 +324,48 @@ def ensure_crypto_benchmark_data(symbol, first_date, last_date, now, date_parser=dateparse, ) five_min_bars.index = pd.to_datetime(five_min_bars.index, utc=True, unit='s') + except (OSError, IOError): + # Otherwise load from Poloniex API + try: + pc = PoloniexCurator() + pc.append_data_single_pair(symbol) - # compute daily bars for open calendar - open_calendar = get_calendar('OPEN') - daily_bars = compute_daily_bars( - five_min_bars, - open_calendar.all_sessions, - ) + five_min_bars = pc.to_dataframe( + first_date, + last_date, + currencyPair=symbol, + ) + except (OSError, IOError, HTTPError): + logger.exception('Failed to new crypto benchmark returns') + raise - # filter daily bars to include first_date and last_date - daily_bars = daily_bars[ - (daily_bars.index >= (first_date - trading_day)) & - (daily_bars.index <= last_date) - ] + # compute daily bars for open calendar + open_calendar = get_calendar('OPEN') + daily_bars = compute_daily_bars( + five_min_bars, + open_calendar.all_sessions, + ) - # select close column and compute percent change between days - daily_close = daily_bars[['close']] - daily_close = daily_close.pct_change(1).iloc[1:] + # filter daily bars to include first_date and last_date + daily_bars = daily_bars[ + (daily_bars.index >= (first_date - trading_day)) & + (daily_bars.index <= last_date) + ] + # select close column and compute percent change between days + daily_close = daily_bars[['close']] + daily_close = daily_close.pct_change(1).iloc[1:] + + try: # write to benchmark csv cache daily_close.to_csv(get_data_filepath(filename, environ)) except (OSError, IOError, HTTPError): logger.exception('Failed to cache the new benchmark returns') raise + if not has_data_for_dates(daily_close, first_date, last_date): logger.warn("Still don't have expected data after redownload!") + return daily_close @@ -535,7 +556,7 @@ def _load_cached_data(filename, first_date, last_date, now, resource_name, if os.path.exists(path): try: data = from_csv(path) - data.index = data.index.to_datetime().tz_localize('UTC') + data.index = pd.to_datetime(data.index).tz_localize('UTC') if has_data_for_dates(data, first_date, last_date): return data