Expose poloniex data curation methods to load benchmark dynamically

This commit is contained in:
Conner Fromknecht
2017-07-01 12:46:44 -07:00
parent 8134b96ac3
commit d2d297d348
3 changed files with 52 additions and 73 deletions
View File
@@ -6,17 +6,14 @@ import time
import requests
import logbook
import catalyst.data.bundles.core as bundles
DT_START = time.mktime(datetime(2010, 01, 01, 0, 0).timetuple())
# DT_START = time.mktime(datetime(2017, 06, 13, 0, 0).timetuple()) # TODO: remove temp
CSV_OUT_FOLDER = '/var/tmp/catalyst/data/poloniex/'
CONN_RETRIES = 2
logbook.StderrHandler().push_application()
log = logbook.Logger(__name__)
class PoloniexDataGenerator(object):
class PoloniexCurator(object):
"""
OHLCV data feed generator for crypto data. Based on Poloniex market data
"""
@@ -89,7 +86,8 @@ class PoloniexDataGenerator(object):
log.debug('Getting data for %s' % currencyPair)
csv_fn = CSV_OUT_FOLDER + 'crypto_prices-' + currencyPair + '.csv'
start = self._get_start_date(csv_fn)
if (time.time() > start): # Only fetch data if more than 5min have passed since last fetch
# Only fetch data if more than 5min have passed since last fetch
if (time.time() > start):
data = self.get_data(currencyPair, start)
if data is not None:
try:
@@ -98,7 +96,14 @@ class PoloniexDataGenerator(object):
for item in data:
if item['date'] == 0:
continue
csvwriter.writerow([item['date'], item['open'], item['high'], item['low'], item['close'], item['volume']])
csvwriter.writerow([
item['date'],
item['open'],
item['high'],
item['low'],
item['close'],
item['volume'],
])
except Exception as e:
log.error('Error opening %s' % csv_fn)
log.exception(e)
@@ -112,7 +117,8 @@ class PoloniexDataGenerator(object):
def append_data(self):
for currencyPair in self.currency_pairs:
self.append_data_single_pair(currencyPair)
time.sleep(0.17) # Rate limit is 6 calls per second, sleep 1sec/6 to be safe
# Rate limit is 6 calls per second, sleep 1sec/6 to be safe
time.sleep(0.17)
'''
Returns a data frame for all pairs, or for the requests currency pair.
@@ -130,57 +136,9 @@ class PoloniexDataGenerator(object):
df['date']=pd.to_datetime(df['date'],unit='s')
df.set_index('date', inplace=True)
#return df.loc[(df.index > start) & (df.index <= end)]
return df[datetime.fromtimestamp(start):datetime.fromtimestamp(end-1)]
if __name__ == '__main__':
pdg = PoloniexDataGenerator()
pdg.get_currency_pairs()
pdg.append_data()
# from zipline.utils.calendars import get_calendar
# from zipline.data.us_equity_pricing import (
# BcolzDailyBarWriter,
# BcolzDailyBarReader,
# )
# open_calendar = get_calendar('OPEN')
# start_session = pd.Timestamp('2012-12-31', tz='UTC')
# end_session = pd.Timestamp('2015-01-01', tz='UTC')
# file_path = 'test.bcolz'
# writer = BcolzDailyBarWriter(
# file_path,
# open_calendar,
# start_session,
# end_session
# )
# index = open_calendar.schedule.index
# index = index[
# (index.date >= start_session.date()) &
# (index.date <= end_session.date())
# ]
# data = pd.DataFrame(
# 0,
# index=index,
# columns=['open', 'high', 'low', 'close', 'volume'],
# )
# writer.write(
# [(0, data)],
# assets=[0],
# show_progress=True
# )
# print 'len(index):', len(index)
# reader = BcolzDailyBarReader(file_path)
# print 'first_rows:', reader._first_rows
# print 'last_rows:', reader._last_rows
pc = PoloniexCurator()
pc.get_currency_pairs()
pc.append_data()
+36 -15
View File
@@ -31,6 +31,8 @@ from ..utils.paths import (
data_root,
)
from ..utils.deprecate import deprecated
from catalyst.curate.poloniex import PoloniexCurator
from catalyst.utils.calendars import get_calendar
@@ -310,6 +312,8 @@ def ensure_crypto_benchmark_data(symbol, first_date, last_date, now,
return daily_bars
five_min_bars = None
try:
# load five minute bars from csv cache
five_min_bars = pd.read_csv(
@@ -320,31 +324,48 @@ def ensure_crypto_benchmark_data(symbol, first_date, last_date, now,
date_parser=dateparse,
)
five_min_bars.index = pd.to_datetime(five_min_bars.index, utc=True, unit='s')
except (OSError, IOError):
# Otherwise load from Poloniex API
try:
pc = PoloniexCurator()
pc.append_data_single_pair(symbol)
# compute daily bars for open calendar
open_calendar = get_calendar('OPEN')
daily_bars = compute_daily_bars(
five_min_bars,
open_calendar.all_sessions,
)
five_min_bars = pc.to_dataframe(
first_date,
last_date,
currencyPair=symbol,
)
except (OSError, IOError, HTTPError):
logger.exception('Failed to new crypto benchmark returns')
raise
# filter daily bars to include first_date and last_date
daily_bars = daily_bars[
(daily_bars.index >= (first_date - trading_day)) &
(daily_bars.index <= last_date)
]
# compute daily bars for open calendar
open_calendar = get_calendar('OPEN')
daily_bars = compute_daily_bars(
five_min_bars,
open_calendar.all_sessions,
)
# select close column and compute percent change between days
daily_close = daily_bars[['close']]
daily_close = daily_close.pct_change(1).iloc[1:]
# filter daily bars to include first_date and last_date
daily_bars = daily_bars[
(daily_bars.index >= (first_date - trading_day)) &
(daily_bars.index <= last_date)
]
# select close column and compute percent change between days
daily_close = daily_bars[['close']]
daily_close = daily_close.pct_change(1).iloc[1:]
try:
# write to benchmark csv cache
daily_close.to_csv(get_data_filepath(filename, environ))
except (OSError, IOError, HTTPError):
logger.exception('Failed to cache the new benchmark returns')
raise
if not has_data_for_dates(daily_close, first_date, last_date):
logger.warn("Still don't have expected data after redownload!")
return daily_close
@@ -535,7 +556,7 @@ def _load_cached_data(filename, first_date, last_date, now, resource_name,
if os.path.exists(path):
try:
data = from_csv(path)
data.index = data.index.to_datetime().tz_localize('UTC')
data.index = pd.to_datetime(data.index).tz_localize('UTC')
if has_data_for_dates(data, first_date, last_date):
return data