From 72e07e242f39765599f1846d241c92610f2f2774 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Thu, 20 Jul 2017 01:37:55 -0400 Subject: [PATCH 1/2] WIP: curating 1min Poloniex data - no append --- catalyst/curate/poloniex.py | 79 +++++++++++++++++++++++++++++++------ 1 file changed, 66 insertions(+), 13 deletions(-) diff --git a/catalyst/curate/poloniex.py b/catalyst/curate/poloniex.py index e92a9a11..f1ba76f1 100644 --- a/catalyst/curate/poloniex.py +++ b/catalyst/curate/poloniex.py @@ -67,18 +67,68 @@ class PoloniexCurator(object): return DT_START - def get_data(self, currencyPair, start, end=9999999999, period=300): - url = self._api_path + 'command=returnChartData¤cyPair=' + currencyPair + '&start=' + str(start) + '&end=' + str(end) + '&period=' + str(period) + def get_data(self, currencyPair, start, end=9999999999, prev_df=None): + log.debug(currencyPair+': Retrieving from '+str(start)+' to '+str(end)) + + ''' + Poloniex limits a single query to returnTradeHistory to less than a year between start and end + ''' + if(end == 9999999999 and time.time() - start > 365*86400 ): + newstart = time.time() - 360*86400 + elif( end != 9999999999 and end - start > 365*86400 ): + newstart = end - 360*86400 + else: + newstart = start + + url = self._api_path + 'command=returnTradeHistory¤cyPair=' + currencyPair + '&start=' + str(newstart) + '&end=' + str(end) try: response = requests.get(url) except Exception as e: - log.error('Failed to retrieve candlestick chart data for %s' % currencyPair) + log.error('Failed to retrieve trade history data for %s' % currencyPair) log.exception(e) return None - return response.json() + log.debug(currencyPair+': Received '+str(len(response.json()))+' trades.') + if(len(response.json())==1 and not isinstance(response.json(),list)): + r = response.json() + print(r) + if(r['error']): + log.error(r['error']) + return None + df = pd.DataFrame(data=response.json(), columns = ['date','rate', 'total', 'tradeID']) + df['rate'] = pd.to_numeric( df['rate'], errors='coerce') # Convert rate to float + df['total'] = pd.to_numeric( df['total'], errors='coerce') # Convert vol to float + df['tradeID'] = pd.to_numeric( df['tradeID'], errors='coerce') # Convert vol to float + df['date'] = pd.to_datetime(df['date'], infer_datetime_format=True) # Convert date + df.set_index('tradeID', inplace=True) # Index by tradeID + df = df.iloc[::-1] # Reverse timeseries as TradeHistory is provided newest to oldest + + if(prev_df is not None): + if(prev_df.index[0] == df.index[0]): + return prev_df + df = prev_df.combine_first(df) + + first = df['date'].iloc[0].value // 10 ** 9 + df = self.get_data( currencyPair, start, first, df ) + return df + + + def generate_ohlcv(self, df): + + df.set_index('date', inplace=True) # Index by date + vol = df['total'].to_frame('volume') # Will deal with vol separately, as ohlc() messes it up + df.drop('total', axis=1, inplace=True) # Drop volume data from dataframe + ohlc = df.resample('T').ohlc() # Resample OHLC in 5min bins + ohlc.columns = ohlc.columns.map(lambda t: t[1]) # Raname columns by dropping 'rate' + closes = ohlc['close'].fillna(method='pad') # Pad forward missing 'close' + ohlc = ohlc.apply(lambda x: x.fillna(closes)) # Fill N/A with last close + vol = vol.resample('T').sum().fillna(0) # Add volumes by bin + ohlcv = pd.concat([ohlc,vol], axis=1) # Concatenate OHLC + Volume + + return ohlcv + ''' Pulls latest data for a single pair ''' @@ -88,21 +138,24 @@ class PoloniexCurator(object): start = self._get_start_date(csv_fn) # Only fetch data if more than 5min have passed since last fetch if (time.time() > start): - data = self.get_data(currencyPair, start) + data = self.get_data(currencyPair, start) + if data is not None: + ohlcv = self.generate_ohlcv(data) + try: with open(csv_fn, 'ab') as csvfile: csvwriter = csv.writer(csvfile) - for item in data: - if item['date'] == 0: + for item in ohlcv.itertuples(): + if item.Index == 0: continue csvwriter.writerow([ - item['date'], - item['open'], - item['high'], - item['low'], - item['close'], - item['volume'], + item.Index.value // 10 ** 9, + item.open, + item.high, + item.low, + item.close, + item.volume, ]) except Exception as e: log.error('Error opening %s' % csv_fn) From d1241252585646e7bced357c72472e1e159df3b9 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Fri, 21 Jul 2017 16:22:29 -0400 Subject: [PATCH 2/2] WIP: trades to disk - no append/no ingestion --- catalyst/curate/poloniex.py | 109 ++++++++++++++++++++++++++++++++++-- 1 file changed, 105 insertions(+), 4 deletions(-) diff --git a/catalyst/curate/poloniex.py b/catalyst/curate/poloniex.py index f1ba76f1..33eb46c9 100644 --- a/catalyst/curate/poloniex.py +++ b/catalyst/curate/poloniex.py @@ -9,6 +9,8 @@ import logbook DT_START = time.mktime(datetime(2010, 1, 1, 0, 0).timetuple()) CSV_OUT_FOLDER = '/var/tmp/catalyst/data/poloniex/' CONN_RETRIES = 2 +COINS = ['USDT_BTC','USDT_DASH','USDT_ETC','USDT_ETH','USDT_LTC','USDT_NXT','USDT_REP','USDT_STR','USDT_XMR','USDT_XRP','USDT_ZEC'] + logbook.StderrHandler().push_application() log = logbook.Logger(__name__) @@ -114,6 +116,101 @@ class PoloniexCurator(object): df = self.get_data( currencyPair, start, first, df ) return df + def retrieve_trade_history(self, currencyPair, start, end=9999999999): + csv_fn = CSV_OUT_FOLDER + 'crypto_trades-' + currencyPair + '.csv' + + try: + with open(csv_fn, 'ab+') as f: + f.seek(0, os.SEEK_END) + if(f.tell() > 2): # First check file is not zero size + f.seek(-2, os.SEEK_END) # Jump to the second last byte. + while f.read(1) != b"\n": # Until EOL is found... + f.seek(-2, os.SEEK_CUR) # ...jump back the read byte plus one more. + lastrow = f.readline() # read last line + last_tradeID = int(lastrow.split(',')[0]) + end = pd.to_datetime( lastrow.split(',')[1], infer_datetime_format=True).value // 10 ** 9 + + except Exception as e: + log.error('Error opening file: %s' % csv_fn) + log.exception(e) + + ''' + Poloniex API limits querying TradeHistory to intervals smaller than 1 year, + so we make sure that start date is never more than 1 year apart from end date + ''' + if( end == 9999999999 and time.time() - start > 365*86400 ): + newstart = time.time() - 360*86400 + elif( end != 9999999999 and end - start > 365*86400 ): + newstart = end - 360*86400 + else: + newstart = start + + log.debug(currencyPair+': Retrieving from '+str(newstart)+' to '+str(end)) + + url = self._api_path + 'command=returnTradeHistory¤cyPair=' + currencyPair + '&start=' + str(newstart) + '&end=' + str(end) + + try: + response = requests.get(url) + except Exception as e: + log.error('Failed to retrieve trade history data for %s' % currencyPair) + log.exception(e) + return None + + if('last_tradeID' in locals() and response.json()[-1]['tradeID'] == last_tradeID): # Got to the end of TradingHistory for this coin + return + + try: + with open(csv_fn, 'ab') as csvfile: + csvwriter = csv.writer(csvfile) + for item in response.json(): + if( 'last_tradeID' in locals() and item['tradeID'] >= last_tradeID ): + continue + csvwriter.writerow([ + item['tradeID'], + item['date'], + item['type'], + item['rate'], + item['amount'], + item['total'], + item['globalTradeID'] + ]) + except Exception as e: + log.error('Error opening %s' % csv_fn) + log.exception(e) + + end = pd.to_datetime( response.json()[-1]['date'], infer_datetime_format=True).value // 10 ** 9 + + self.retrieve_trade_history(currencyPair, start, end) # If we get here, we aren't done. Repeat + + def write_ohlcv_file(self, currencyPair): + + csv_trades = CSV_OUT_FOLDER + 'crypto_trades-' + currencyPair + '.csv' + csv_1min = CSV_OUT_FOLDER + 'crypto_1min-' + currencyPair + '.csv' + if( os.path.isfile(csv_1min) ): + log.debug(currencyPair+': 1min data already present. Delete the file if you want to rebuild it.') + else: + df = pd.read_csv(csv_trades, names=['tradeID','date','type','rate','amount','total','globalTradeID'] ) + df.drop(['tradeID','type','amount','globalTradeID'], axis=1, inplace=True) + df['date'] = pd.to_datetime(df['date'], infer_datetime_format=True) + ohlcv = self.generate_ohlcv(df) + try: + with open(csv_1min, 'ab') as csvfile: + csvwriter = csv.writer(csvfile) + for item in ohlcv.itertuples(): + if item.Index == 0: + continue + csvwriter.writerow([ + item.Index.value // 10 ** 9, + item.open, + item.high, + item.low, + item.close, + item.volume, + ]) + except Exception as e: + log.error('Error opening %s' % csv_fn) + log.exception(e) + log.debug(currencyPair+': Generated 1min OHLCV data.') def generate_ohlcv(self, df): @@ -171,10 +268,10 @@ class PoloniexCurator(object): for currencyPair in self.currency_pairs: self.append_data_single_pair(currencyPair) # Rate limit is 6 calls per second, sleep 1sec/6 to be safe - time.sleep(0.17) + #time.sleep(0.17) ''' - Returns a data frame for all pairs, or for the requests currency pair. + Returns a data frame for all pairs, or for the requested currency pair. Makes sure data is up to date ''' def to_dataframe(self, start, end, currencyPair=None): @@ -193,5 +290,9 @@ class PoloniexCurator(object): if __name__ == '__main__': pc = PoloniexCurator() - pc.get_currency_pairs() - pc.append_data() + #pc.get_currency_pairs() + #pc.append_data() + + for coin in COINS: + # pc.retrieve_trade_history(coin,DT_START) + pc.write_ohlcv_file(coin)