From ad2d0e9253350f3ece26ca25e1ca6efd11a6c1f3 Mon Sep 17 00:00:00 2001 From: fredfortier Date: Thu, 7 Sep 2017 14:26:55 -0400 Subject: [PATCH 01/29] Fixed path issue with obsolete branch --- catalyst/exchange/exchange_utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/catalyst/exchange/exchange_utils.py b/catalyst/exchange/exchange_utils.py index 3b559726..5d69e4e8 100644 --- a/catalyst/exchange/exchange_utils.py +++ b/catalyst/exchange/exchange_utils.py @@ -8,8 +8,9 @@ from catalyst.exchange.exchange_errors import ExchangeAuthNotFound, \ ExchangeSymbolsNotFound from catalyst.utils.paths import data_root, ensure_directory +# TODO: move to aws SYMBOLS_URL = 'https://raw.githubusercontent.com/enigmampc/catalyst/' \ - 'exchange-trading/catalyst/exchange/{exchange}/symbols.json' + 'master/catalyst/exchange/{exchange}/symbols.json' def get_exchange_folder(exchange_name, environ=None): From 48143d321266c4374731ccb1267a6ce70065c491 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Wed, 13 Sep 2017 17:22:11 -0600 Subject: [PATCH 02/29] WIP: Fixes 1/1000 price issue in history, and works with full coins. Requires matching-version 'catalyst ingest' --- catalyst/data/_equities.pyx | 2 +- catalyst/data/bundles/poloniex.py | 2 +- catalyst/data/history_loader.py | 2 +- catalyst/data/us_equity_pricing.py | 11 +++++++---- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/catalyst/data/_equities.pyx b/catalyst/data/_equities.pyx index 81f9a66e..563fa56a 100644 --- a/catalyst/data/_equities.pyx +++ b/catalyst/data/_equities.pyx @@ -217,7 +217,7 @@ cpdef _read_bcolz_data(ctable_t table, if column_name in ['open', 'high', 'low', 'close']: where_nan = (outbuf == 0) - outbuf_as_float = outbuf.astype(float64) * .000001 + outbuf_as_float = outbuf.astype(float64) * .000000001 outbuf_as_float[where_nan] = NAN results.append(outbuf_as_float) elif column_name != 'volume': diff --git a/catalyst/data/bundles/poloniex.py b/catalyst/data/bundles/poloniex.py index 4dcdf7bc..eb4fc735 100644 --- a/catalyst/data/bundles/poloniex.py +++ b/catalyst/data/bundles/poloniex.py @@ -105,7 +105,7 @@ class PoloniexBundle(BaseCryptoPricingBundle): # BcolzDailyBarReader introduces a 1/1000 factor in the way pricing is stored # on disk, which we compensate here to get the right pricing amounts # ref: data/us_equity_pricing.py - scale = 1000 + scale = 1 raw.loc[:, 'open'] /= scale raw.loc[:, 'high'] /= scale raw.loc[:, 'low'] /= scale diff --git a/catalyst/data/history_loader.py b/catalyst/data/history_loader.py index 8bc00563..e33f5f6e 100644 --- a/catalyst/data/history_loader.py +++ b/catalyst/data/history_loader.py @@ -38,7 +38,7 @@ from catalyst.utils.numpy_utils import float64_dtype from catalyst.utils.pandas_utils import find_in_sorted_index # Default number of decimal places used for rounding asset prices. -DEFAULT_ASSET_PRICE_DECIMALS = 3 +DEFAULT_ASSET_PRICE_DECIMALS = 9 class HistoryCompatibleUSEquityAdjustmentReader(object): diff --git a/catalyst/data/us_equity_pricing.py b/catalyst/data/us_equity_pricing.py index 03d01a4e..901a0e60 100644 --- a/catalyst/data/us_equity_pricing.py +++ b/catalyst/data/us_equity_pricing.py @@ -11,6 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from __future__ import division # Python2 req to have division of ints yield float + from errno import ENOENT from functools import partial from os import remove @@ -80,7 +83,6 @@ from catalyst.utils.cli import ( from ._equities import _compute_row_slices, _read_bcolz_data from ._adjustments import load_adjustments_from_sqlite - logger = logbook.Logger('UsEquityPricing') OHLC = frozenset(['open', 'high', 'low', 'close']) @@ -116,6 +118,8 @@ SQLITE_STOCK_DIVIDEND_PAYOUT_COLUMN_DTYPES = { UINT32_MAX = iinfo(uint32).max UINT64_MAX = iinfo(uint64).max +PRICE_ADJUSTMENT_FACTOR = 1000000000 # Provides 9 decimals resolution. Also affects _equities.pyx L220 + def check_uint32_safe(value, colname): if value >= UINT32_MAX: @@ -433,7 +437,7 @@ class BcolzDailyBarWriter(object): return raw_data winsorise_uint64(raw_data, invalid_data_behavior, 'volume', *OHLC) - processed = (raw_data[list(OHLC)] * 1000000).astype('uint64') + processed = (raw_data[list(OHLC)] * PRICE_ADJUSTMENT_FACTOR).astype('uint64') dates = raw_data.index.values.astype('datetime64[s]') check_uint32_safe(dates.max().view(np.int64), 'day') processed['day'] = dates.astype('uint32') @@ -519,7 +523,6 @@ class BcolzDailyBarReader(SessionBarReader): # Need to test keeping the entire array in memory for the course of a # process first. self._spot_cols = {} - self.PRICE_ADJUSTMENT_FACTOR = 0.001 self._read_all_threshold = read_all_threshold @lazyval @@ -763,7 +766,7 @@ class BcolzDailyBarReader(SessionBarReader): if price == 0: return nan else: - return price * 0.001 + return price / PRICE_ADJUSTMENT_FACTOR else: return price From e5a137f205fc55760832abd5db3fb906b27ffb44 Mon Sep 17 00:00:00 2001 From: Andrew Campbell Date: Fri, 21 Jul 2017 18:05:32 -0500 Subject: [PATCH 03/29] ENH: Bound trade amount with asset specific min trade size --- catalyst/algorithm.py | 17 +++++++---------- catalyst/assets/_assets.pyx | 15 +++++++++++---- catalyst/assets/asset_db_schema.py | 3 ++- catalyst/assets/asset_writer.py | 1 + catalyst/finance/slippage.py | 7 +++++-- catalyst/finance/transaction.py | 6 +----- catalyst/utils/math_utils.py | 2 ++ 7 files changed, 29 insertions(+), 22 deletions(-) diff --git a/catalyst/algorithm.py b/catalyst/algorithm.py index ed10fedd..5601f133 100644 --- a/catalyst/algorithm.py +++ b/catalyst/algorithm.py @@ -125,6 +125,7 @@ from catalyst.utils.factory import create_simulation_parameters from catalyst.utils.math_utils import ( tolerant_equals, round_if_near_integer, + round_nearest ) from catalyst.utils.pandas_utils import clear_dataframe_indexer_caches from catalyst.utils.preprocess import preprocess @@ -1488,7 +1489,7 @@ class TradingAlgorithm(object): def _calculate_order(self, asset, amount, limit_price=None, stop_price=None, style=None): - amount = self.round_order(amount) + amount = self.round_order(amount, asset) # Raises a ZiplineError if invalid parameters are detected. self.validate_order_params(asset, @@ -1505,16 +1506,13 @@ class TradingAlgorithm(object): return amount, style @staticmethod - def round_order(amount): + def round_order(amount, asset): """ - Convert number of shares to an integer. - - By default, truncates to the integer share count that's either within - .0001 of amount or closer to zero. - - E.g. 3.9999 -> 4.0; 5.5 -> 5.0; -5.5 -> -5.0 + Converts the number of shares to the smallest tradable lot size for + the asset being ordered. + """ - return int(round_if_near_integer(amount)) + return round_nearest(amount, asset.min_trade_size) def validate_order_params(self, asset, @@ -1550,7 +1548,6 @@ class TradingAlgorithm(object): self.updated_portfolio(), self.get_datetime(), self.trading_client.current_data) - @staticmethod def __convert_order_params_for_blotter(limit_price, stop_price, style): """ diff --git a/catalyst/assets/_assets.pyx b/catalyst/assets/_assets.pyx index cdc34008..05144223 100644 --- a/catalyst/assets/_assets.pyx +++ b/catalyst/assets/_assets.pyx @@ -59,6 +59,7 @@ cdef class Asset: cdef readonly object exchange cdef readonly object exchange_full + cdef readonly object min_trade_size _kwargnames = frozenset({ 'sid', @@ -70,6 +71,7 @@ cdef class Asset: 'auto_close_date', 'exchange', 'exchange_full', + 'min_trade_size', }) def __init__(self, @@ -81,7 +83,8 @@ cdef class Asset: object end_date=None, object first_traded=None, object auto_close_date=None, - object exchange_full=None): + object exchange_full=None, + object min_trade_size=None): self.sid = sid self.sid_hash = hash(sid) @@ -94,6 +97,7 @@ cdef class Asset: self.end_date = end_date self.first_traded = first_traded self.auto_close_date = auto_close_date + self.min_trade_size = min_trade_size def __int__(self): return self.sid @@ -148,7 +152,8 @@ cdef class Asset: def __repr__(self): attrs = ('symbol', 'asset_name', 'exchange', - 'start_date', 'end_date', 'first_traded', 'auto_close_date') + 'start_date', 'end_date', 'first_traded', 'auto_close_date', + 'min_trade_size') tuples = ((attr, repr(getattr(self, attr, None))) for attr in attrs) strings = ('%s=%s' % (t[0], t[1]) for t in tuples) @@ -170,7 +175,8 @@ cdef class Asset: self.end_date, self.first_traded, self.auto_close_date, - self.exchange_full)) + self.exchange_full, + self.min_trade_size)) cpdef to_dict(self): """ @@ -186,6 +192,7 @@ cdef class Asset: 'auto_close_date': self.auto_close_date, 'exchange': self.exchange, 'exchange_full': self.exchange_full, + 'min_trade_size': self.min_trade_size } @classmethod @@ -234,7 +241,7 @@ cdef class Equity(Asset): def __repr__(self): attrs = ('symbol', 'asset_name', 'exchange', 'start_date', 'end_date', 'first_traded', 'auto_close_date', - 'exchange_full') + 'exchange_full', 'min_trade_size') tuples = ((attr, repr(getattr(self, attr, None))) for attr in attrs) strings = ('%s=%s' % (t[0], t[1]) for t in tuples) diff --git a/catalyst/assets/asset_db_schema.py b/catalyst/assets/asset_db_schema.py index 9c9c98c2..35e062f9 100644 --- a/catalyst/assets/asset_db_schema.py +++ b/catalyst/assets/asset_db_schema.py @@ -39,7 +39,8 @@ equities = sa.Table( sa.Column('first_traded', sa.Integer), sa.Column('auto_close_date', sa.Integer), sa.Column('exchange', sa.Text), - sa.Column('exchange_full', sa.Text) + sa.Column('exchange_full', sa.Text), + sa.Column('min_trade_size', sa.Float) ) equity_symbol_mappings = sa.Table( diff --git a/catalyst/assets/asset_writer.py b/catalyst/assets/asset_writer.py index 9910db72..1ea5afb2 100644 --- a/catalyst/assets/asset_writer.py +++ b/catalyst/assets/asset_writer.py @@ -73,6 +73,7 @@ _equities_defaults = { 'exchange': None, # optional, something like "New York Stock Exchange" 'exchange_full': None, + 'min_trade_size': None } # Default values for the futures DataFrame diff --git a/catalyst/finance/slippage.py b/catalyst/finance/slippage.py index 118a5b58..cfba2683 100644 --- a/catalyst/finance/slippage.py +++ b/catalyst/finance/slippage.py @@ -41,6 +41,7 @@ DEFAULT_EQUITY_VOLUME_SLIPPAGE_BAR_LIMIT = 0.025 DEFAULT_FUTURE_VOLUME_SLIPPAGE_BAR_LIMIT = 0.05 + class LiquidityExceeded(Exception): pass @@ -205,12 +206,14 @@ class VolumeShareSlippage(SlippageModel): def process_order(self, data, order): volume = data.current(order.asset, "volume") + min_trade_size = order.asset.min_trade_size + max_volume = self.volume_limit * volume # price impact accounts for the total volume of transactions # created against the current minute bar remaining_volume = max_volume - self.volume_for_bar - if remaining_volume < 1: + if remaining_volume < min_trade_size: # we can't fill any more transactions raise LiquidityExceeded() @@ -218,7 +221,7 @@ class VolumeShareSlippage(SlippageModel): # volume available in the bar or the open amount. cur_volume = int(min(remaining_volume, abs(order.open_amount))) - if cur_volume < 1: + if cur_volume < min_trade_size: return None, None # tally the current amount into our total amount ordered. diff --git a/catalyst/finance/transaction.py b/catalyst/finance/transaction.py index 3a388890..8d215dcc 100644 --- a/catalyst/finance/transaction.py +++ b/catalyst/finance/transaction.py @@ -65,14 +65,10 @@ def create_transaction(order, dt, price, amount): # floor the amount to protect against non-whole number orders # TODO: Investigate whether we can add a robust check in blotter # and/or tradesimulation, as well. - amount_magnitude = int(abs(amount)) - - if amount_magnitude < 1: - raise Exception("Transaction magnitude must be at least 1.") transaction = Transaction( asset=order.asset, - amount=int(amount), + amount=amounts, dt=dt, price=price, order_id=order.id diff --git a/catalyst/utils/math_utils.py b/catalyst/utils/math_utils.py index 16fdb99d..7981a52b 100644 --- a/catalyst/utils/math_utils.py +++ b/catalyst/utils/math_utils.py @@ -17,6 +17,8 @@ import math from numpy import isnan +def round_nearest(x, a): + return round(round(x / a) * a, -int(math.floor(math.log10(a)))) def tolerant_equals(a, b, atol=10e-7, rtol=10e-7, equal_nan=False): """Check if a and b are equal with some tolerance. From c3897cfa5a1f598a6575337102c48dadb82c6f74 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Thu, 14 Sep 2017 15:22:51 -0600 Subject: [PATCH 04/29] ENH: wrapping up asset min_trade_size for fractional coinsup to 1/100000000th of a coin --- catalyst/assets/asset_writer.py | 4 +++- catalyst/data/bundles/base_pricing.py | 1 + catalyst/data/bundles/poloniex.py | 2 ++ catalyst/finance/slippage.py | 2 +- catalyst/finance/transaction.py | 2 +- 5 files changed, 8 insertions(+), 3 deletions(-) diff --git a/catalyst/assets/asset_writer.py b/catalyst/assets/asset_writer.py index 1ea5afb2..ec30ffab 100644 --- a/catalyst/assets/asset_writer.py +++ b/catalyst/assets/asset_writer.py @@ -73,7 +73,7 @@ _equities_defaults = { 'exchange': None, # optional, something like "New York Stock Exchange" 'exchange_full': None, - 'min_trade_size': None + 'min_trade_size': 1 } # Default values for the futures DataFrame @@ -391,6 +391,8 @@ class AssetDBWriter(object): The date on which to close any positions in this asset. exchange : str The exchange where this asset is traded. + min_trade_size: float, optional + The minimum denomination this asset can be traded. The index of this dataframe should contain the sids. futures : pd.DataFrame, optional diff --git a/catalyst/data/bundles/base_pricing.py b/catalyst/data/bundles/base_pricing.py index a0abd51a..c5281fdd 100644 --- a/catalyst/data/bundles/base_pricing.py +++ b/catalyst/data/bundles/base_pricing.py @@ -24,6 +24,7 @@ class BasePricingBundle(BaseBundle): ('start_date', 'datetime64[ns]'), ('end_date', 'datetime64[ns]'), ('ac_date', 'datetime64[ns]'), + ('min_trade_size', 'float'), ] @lazyval diff --git a/catalyst/data/bundles/poloniex.py b/catalyst/data/bundles/poloniex.py index eb4fc735..8b5628d5 100644 --- a/catalyst/data/bundles/poloniex.py +++ b/catalyst/data/bundles/poloniex.py @@ -75,12 +75,14 @@ class PoloniexBundle(BaseCryptoPricingBundle): start_date = sym_data.index[0] end_date = sym_data.index[-1] ac_date = end_date + pd.Timedelta(days=1) + min_trade_size = 0.00000001 return ( sym_md.symbol, start_date, end_date, ac_date, + min_trade_size, ) def fetch_raw_symbol_frame(self, diff --git a/catalyst/finance/slippage.py b/catalyst/finance/slippage.py index cfba2683..36de4ec7 100644 --- a/catalyst/finance/slippage.py +++ b/catalyst/finance/slippage.py @@ -219,7 +219,7 @@ class VolumeShareSlippage(SlippageModel): # the current order amount will be the min of the # volume available in the bar or the open amount. - cur_volume = int(min(remaining_volume, abs(order.open_amount))) + cur_volume = min(remaining_volume, abs(order.open_amount)) if cur_volume < min_trade_size: return None, None diff --git a/catalyst/finance/transaction.py b/catalyst/finance/transaction.py index 8d215dcc..f9d72c38 100644 --- a/catalyst/finance/transaction.py +++ b/catalyst/finance/transaction.py @@ -68,7 +68,7 @@ def create_transaction(order, dt, price, amount): transaction = Transaction( asset=order.asset, - amount=amounts, + amount=amount, dt=dt, price=price, order_id=order.id From 72e07e242f39765599f1846d241c92610f2f2774 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Thu, 20 Jul 2017 01:37:55 -0400 Subject: [PATCH 05/29] WIP: curating 1min Poloniex data - no append --- catalyst/curate/poloniex.py | 79 +++++++++++++++++++++++++++++++------ 1 file changed, 66 insertions(+), 13 deletions(-) diff --git a/catalyst/curate/poloniex.py b/catalyst/curate/poloniex.py index e92a9a11..f1ba76f1 100644 --- a/catalyst/curate/poloniex.py +++ b/catalyst/curate/poloniex.py @@ -67,18 +67,68 @@ class PoloniexCurator(object): return DT_START - def get_data(self, currencyPair, start, end=9999999999, period=300): - url = self._api_path + 'command=returnChartData¤cyPair=' + currencyPair + '&start=' + str(start) + '&end=' + str(end) + '&period=' + str(period) + def get_data(self, currencyPair, start, end=9999999999, prev_df=None): + log.debug(currencyPair+': Retrieving from '+str(start)+' to '+str(end)) + + ''' + Poloniex limits a single query to returnTradeHistory to less than a year between start and end + ''' + if(end == 9999999999 and time.time() - start > 365*86400 ): + newstart = time.time() - 360*86400 + elif( end != 9999999999 and end - start > 365*86400 ): + newstart = end - 360*86400 + else: + newstart = start + + url = self._api_path + 'command=returnTradeHistory¤cyPair=' + currencyPair + '&start=' + str(newstart) + '&end=' + str(end) try: response = requests.get(url) except Exception as e: - log.error('Failed to retrieve candlestick chart data for %s' % currencyPair) + log.error('Failed to retrieve trade history data for %s' % currencyPair) log.exception(e) return None - return response.json() + log.debug(currencyPair+': Received '+str(len(response.json()))+' trades.') + if(len(response.json())==1 and not isinstance(response.json(),list)): + r = response.json() + print(r) + if(r['error']): + log.error(r['error']) + return None + df = pd.DataFrame(data=response.json(), columns = ['date','rate', 'total', 'tradeID']) + df['rate'] = pd.to_numeric( df['rate'], errors='coerce') # Convert rate to float + df['total'] = pd.to_numeric( df['total'], errors='coerce') # Convert vol to float + df['tradeID'] = pd.to_numeric( df['tradeID'], errors='coerce') # Convert vol to float + df['date'] = pd.to_datetime(df['date'], infer_datetime_format=True) # Convert date + df.set_index('tradeID', inplace=True) # Index by tradeID + df = df.iloc[::-1] # Reverse timeseries as TradeHistory is provided newest to oldest + + if(prev_df is not None): + if(prev_df.index[0] == df.index[0]): + return prev_df + df = prev_df.combine_first(df) + + first = df['date'].iloc[0].value // 10 ** 9 + df = self.get_data( currencyPair, start, first, df ) + return df + + + def generate_ohlcv(self, df): + + df.set_index('date', inplace=True) # Index by date + vol = df['total'].to_frame('volume') # Will deal with vol separately, as ohlc() messes it up + df.drop('total', axis=1, inplace=True) # Drop volume data from dataframe + ohlc = df.resample('T').ohlc() # Resample OHLC in 5min bins + ohlc.columns = ohlc.columns.map(lambda t: t[1]) # Raname columns by dropping 'rate' + closes = ohlc['close'].fillna(method='pad') # Pad forward missing 'close' + ohlc = ohlc.apply(lambda x: x.fillna(closes)) # Fill N/A with last close + vol = vol.resample('T').sum().fillna(0) # Add volumes by bin + ohlcv = pd.concat([ohlc,vol], axis=1) # Concatenate OHLC + Volume + + return ohlcv + ''' Pulls latest data for a single pair ''' @@ -88,21 +138,24 @@ class PoloniexCurator(object): start = self._get_start_date(csv_fn) # Only fetch data if more than 5min have passed since last fetch if (time.time() > start): - data = self.get_data(currencyPair, start) + data = self.get_data(currencyPair, start) + if data is not None: + ohlcv = self.generate_ohlcv(data) + try: with open(csv_fn, 'ab') as csvfile: csvwriter = csv.writer(csvfile) - for item in data: - if item['date'] == 0: + for item in ohlcv.itertuples(): + if item.Index == 0: continue csvwriter.writerow([ - item['date'], - item['open'], - item['high'], - item['low'], - item['close'], - item['volume'], + item.Index.value // 10 ** 9, + item.open, + item.high, + item.low, + item.close, + item.volume, ]) except Exception as e: log.error('Error opening %s' % csv_fn) From d1241252585646e7bced357c72472e1e159df3b9 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Fri, 21 Jul 2017 16:22:29 -0400 Subject: [PATCH 06/29] WIP: trades to disk - no append/no ingestion --- catalyst/curate/poloniex.py | 109 ++++++++++++++++++++++++++++++++++-- 1 file changed, 105 insertions(+), 4 deletions(-) diff --git a/catalyst/curate/poloniex.py b/catalyst/curate/poloniex.py index f1ba76f1..33eb46c9 100644 --- a/catalyst/curate/poloniex.py +++ b/catalyst/curate/poloniex.py @@ -9,6 +9,8 @@ import logbook DT_START = time.mktime(datetime(2010, 1, 1, 0, 0).timetuple()) CSV_OUT_FOLDER = '/var/tmp/catalyst/data/poloniex/' CONN_RETRIES = 2 +COINS = ['USDT_BTC','USDT_DASH','USDT_ETC','USDT_ETH','USDT_LTC','USDT_NXT','USDT_REP','USDT_STR','USDT_XMR','USDT_XRP','USDT_ZEC'] + logbook.StderrHandler().push_application() log = logbook.Logger(__name__) @@ -114,6 +116,101 @@ class PoloniexCurator(object): df = self.get_data( currencyPair, start, first, df ) return df + def retrieve_trade_history(self, currencyPair, start, end=9999999999): + csv_fn = CSV_OUT_FOLDER + 'crypto_trades-' + currencyPair + '.csv' + + try: + with open(csv_fn, 'ab+') as f: + f.seek(0, os.SEEK_END) + if(f.tell() > 2): # First check file is not zero size + f.seek(-2, os.SEEK_END) # Jump to the second last byte. + while f.read(1) != b"\n": # Until EOL is found... + f.seek(-2, os.SEEK_CUR) # ...jump back the read byte plus one more. + lastrow = f.readline() # read last line + last_tradeID = int(lastrow.split(',')[0]) + end = pd.to_datetime( lastrow.split(',')[1], infer_datetime_format=True).value // 10 ** 9 + + except Exception as e: + log.error('Error opening file: %s' % csv_fn) + log.exception(e) + + ''' + Poloniex API limits querying TradeHistory to intervals smaller than 1 year, + so we make sure that start date is never more than 1 year apart from end date + ''' + if( end == 9999999999 and time.time() - start > 365*86400 ): + newstart = time.time() - 360*86400 + elif( end != 9999999999 and end - start > 365*86400 ): + newstart = end - 360*86400 + else: + newstart = start + + log.debug(currencyPair+': Retrieving from '+str(newstart)+' to '+str(end)) + + url = self._api_path + 'command=returnTradeHistory¤cyPair=' + currencyPair + '&start=' + str(newstart) + '&end=' + str(end) + + try: + response = requests.get(url) + except Exception as e: + log.error('Failed to retrieve trade history data for %s' % currencyPair) + log.exception(e) + return None + + if('last_tradeID' in locals() and response.json()[-1]['tradeID'] == last_tradeID): # Got to the end of TradingHistory for this coin + return + + try: + with open(csv_fn, 'ab') as csvfile: + csvwriter = csv.writer(csvfile) + for item in response.json(): + if( 'last_tradeID' in locals() and item['tradeID'] >= last_tradeID ): + continue + csvwriter.writerow([ + item['tradeID'], + item['date'], + item['type'], + item['rate'], + item['amount'], + item['total'], + item['globalTradeID'] + ]) + except Exception as e: + log.error('Error opening %s' % csv_fn) + log.exception(e) + + end = pd.to_datetime( response.json()[-1]['date'], infer_datetime_format=True).value // 10 ** 9 + + self.retrieve_trade_history(currencyPair, start, end) # If we get here, we aren't done. Repeat + + def write_ohlcv_file(self, currencyPair): + + csv_trades = CSV_OUT_FOLDER + 'crypto_trades-' + currencyPair + '.csv' + csv_1min = CSV_OUT_FOLDER + 'crypto_1min-' + currencyPair + '.csv' + if( os.path.isfile(csv_1min) ): + log.debug(currencyPair+': 1min data already present. Delete the file if you want to rebuild it.') + else: + df = pd.read_csv(csv_trades, names=['tradeID','date','type','rate','amount','total','globalTradeID'] ) + df.drop(['tradeID','type','amount','globalTradeID'], axis=1, inplace=True) + df['date'] = pd.to_datetime(df['date'], infer_datetime_format=True) + ohlcv = self.generate_ohlcv(df) + try: + with open(csv_1min, 'ab') as csvfile: + csvwriter = csv.writer(csvfile) + for item in ohlcv.itertuples(): + if item.Index == 0: + continue + csvwriter.writerow([ + item.Index.value // 10 ** 9, + item.open, + item.high, + item.low, + item.close, + item.volume, + ]) + except Exception as e: + log.error('Error opening %s' % csv_fn) + log.exception(e) + log.debug(currencyPair+': Generated 1min OHLCV data.') def generate_ohlcv(self, df): @@ -171,10 +268,10 @@ class PoloniexCurator(object): for currencyPair in self.currency_pairs: self.append_data_single_pair(currencyPair) # Rate limit is 6 calls per second, sleep 1sec/6 to be safe - time.sleep(0.17) + #time.sleep(0.17) ''' - Returns a data frame for all pairs, or for the requests currency pair. + Returns a data frame for all pairs, or for the requested currency pair. Makes sure data is up to date ''' def to_dataframe(self, start, end, currencyPair=None): @@ -193,5 +290,9 @@ class PoloniexCurator(object): if __name__ == '__main__': pc = PoloniexCurator() - pc.get_currency_pairs() - pc.append_data() + #pc.get_currency_pairs() + #pc.append_data() + + for coin in COINS: + # pc.retrieve_trade_history(coin,DT_START) + pc.write_ohlcv_file(coin) From 01eefd67e01595a31b96e455e4b451ceab754550 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Fri, 15 Sep 2017 10:51:04 -0600 Subject: [PATCH 07/29] ingestion switch to create_writers when ingesting locally --- catalyst/data/bundles/poloniex.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/catalyst/data/bundles/poloniex.py b/catalyst/data/bundles/poloniex.py index eb4fc735..85ee6b8d 100644 --- a/catalyst/data/bundles/poloniex.py +++ b/catalyst/data/bundles/poloniex.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import sys + from datetime import datetime import pandas as pd @@ -166,4 +168,9 @@ register_bundle(PoloniexBundle, ['USDT_BTC',]) For a production environment make sure to use (to bundle all pairs): register_bundle(PoloniexBundle) ''' -register_bundle(PoloniexBundle, create_writers=False) + +if 'ingest' in sys.argv and '-c' in sys.argv: + register_bundle(PoloniexBundle) +else: + register_bundle(PoloniexBundle, create_writers=False) + From 4a4277d9d1f26031b42e6b4a0c84b814b4c7429e Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Thu, 20 Jul 2017 01:37:55 -0400 Subject: [PATCH 08/29] WIP: curating 1min Poloniex data - no append --- catalyst/curate/poloniex.py | 79 +++++++++++++++++++++++++++++++------ 1 file changed, 66 insertions(+), 13 deletions(-) diff --git a/catalyst/curate/poloniex.py b/catalyst/curate/poloniex.py index e92a9a11..f1ba76f1 100644 --- a/catalyst/curate/poloniex.py +++ b/catalyst/curate/poloniex.py @@ -67,18 +67,68 @@ class PoloniexCurator(object): return DT_START - def get_data(self, currencyPair, start, end=9999999999, period=300): - url = self._api_path + 'command=returnChartData¤cyPair=' + currencyPair + '&start=' + str(start) + '&end=' + str(end) + '&period=' + str(period) + def get_data(self, currencyPair, start, end=9999999999, prev_df=None): + log.debug(currencyPair+': Retrieving from '+str(start)+' to '+str(end)) + + ''' + Poloniex limits a single query to returnTradeHistory to less than a year between start and end + ''' + if(end == 9999999999 and time.time() - start > 365*86400 ): + newstart = time.time() - 360*86400 + elif( end != 9999999999 and end - start > 365*86400 ): + newstart = end - 360*86400 + else: + newstart = start + + url = self._api_path + 'command=returnTradeHistory¤cyPair=' + currencyPair + '&start=' + str(newstart) + '&end=' + str(end) try: response = requests.get(url) except Exception as e: - log.error('Failed to retrieve candlestick chart data for %s' % currencyPair) + log.error('Failed to retrieve trade history data for %s' % currencyPair) log.exception(e) return None - return response.json() + log.debug(currencyPair+': Received '+str(len(response.json()))+' trades.') + if(len(response.json())==1 and not isinstance(response.json(),list)): + r = response.json() + print(r) + if(r['error']): + log.error(r['error']) + return None + df = pd.DataFrame(data=response.json(), columns = ['date','rate', 'total', 'tradeID']) + df['rate'] = pd.to_numeric( df['rate'], errors='coerce') # Convert rate to float + df['total'] = pd.to_numeric( df['total'], errors='coerce') # Convert vol to float + df['tradeID'] = pd.to_numeric( df['tradeID'], errors='coerce') # Convert vol to float + df['date'] = pd.to_datetime(df['date'], infer_datetime_format=True) # Convert date + df.set_index('tradeID', inplace=True) # Index by tradeID + df = df.iloc[::-1] # Reverse timeseries as TradeHistory is provided newest to oldest + + if(prev_df is not None): + if(prev_df.index[0] == df.index[0]): + return prev_df + df = prev_df.combine_first(df) + + first = df['date'].iloc[0].value // 10 ** 9 + df = self.get_data( currencyPair, start, first, df ) + return df + + + def generate_ohlcv(self, df): + + df.set_index('date', inplace=True) # Index by date + vol = df['total'].to_frame('volume') # Will deal with vol separately, as ohlc() messes it up + df.drop('total', axis=1, inplace=True) # Drop volume data from dataframe + ohlc = df.resample('T').ohlc() # Resample OHLC in 5min bins + ohlc.columns = ohlc.columns.map(lambda t: t[1]) # Raname columns by dropping 'rate' + closes = ohlc['close'].fillna(method='pad') # Pad forward missing 'close' + ohlc = ohlc.apply(lambda x: x.fillna(closes)) # Fill N/A with last close + vol = vol.resample('T').sum().fillna(0) # Add volumes by bin + ohlcv = pd.concat([ohlc,vol], axis=1) # Concatenate OHLC + Volume + + return ohlcv + ''' Pulls latest data for a single pair ''' @@ -88,21 +138,24 @@ class PoloniexCurator(object): start = self._get_start_date(csv_fn) # Only fetch data if more than 5min have passed since last fetch if (time.time() > start): - data = self.get_data(currencyPair, start) + data = self.get_data(currencyPair, start) + if data is not None: + ohlcv = self.generate_ohlcv(data) + try: with open(csv_fn, 'ab') as csvfile: csvwriter = csv.writer(csvfile) - for item in data: - if item['date'] == 0: + for item in ohlcv.itertuples(): + if item.Index == 0: continue csvwriter.writerow([ - item['date'], - item['open'], - item['high'], - item['low'], - item['close'], - item['volume'], + item.Index.value // 10 ** 9, + item.open, + item.high, + item.low, + item.close, + item.volume, ]) except Exception as e: log.error('Error opening %s' % csv_fn) From 6fddb925637d06cc7f3b3263e3e2172cc1338a62 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Fri, 21 Jul 2017 16:22:29 -0400 Subject: [PATCH 09/29] WIP: trades to disk - no append/no ingestion --- catalyst/curate/poloniex.py | 109 ++++++++++++++++++++++++++++++++++-- 1 file changed, 105 insertions(+), 4 deletions(-) diff --git a/catalyst/curate/poloniex.py b/catalyst/curate/poloniex.py index f1ba76f1..33eb46c9 100644 --- a/catalyst/curate/poloniex.py +++ b/catalyst/curate/poloniex.py @@ -9,6 +9,8 @@ import logbook DT_START = time.mktime(datetime(2010, 1, 1, 0, 0).timetuple()) CSV_OUT_FOLDER = '/var/tmp/catalyst/data/poloniex/' CONN_RETRIES = 2 +COINS = ['USDT_BTC','USDT_DASH','USDT_ETC','USDT_ETH','USDT_LTC','USDT_NXT','USDT_REP','USDT_STR','USDT_XMR','USDT_XRP','USDT_ZEC'] + logbook.StderrHandler().push_application() log = logbook.Logger(__name__) @@ -114,6 +116,101 @@ class PoloniexCurator(object): df = self.get_data( currencyPair, start, first, df ) return df + def retrieve_trade_history(self, currencyPair, start, end=9999999999): + csv_fn = CSV_OUT_FOLDER + 'crypto_trades-' + currencyPair + '.csv' + + try: + with open(csv_fn, 'ab+') as f: + f.seek(0, os.SEEK_END) + if(f.tell() > 2): # First check file is not zero size + f.seek(-2, os.SEEK_END) # Jump to the second last byte. + while f.read(1) != b"\n": # Until EOL is found... + f.seek(-2, os.SEEK_CUR) # ...jump back the read byte plus one more. + lastrow = f.readline() # read last line + last_tradeID = int(lastrow.split(',')[0]) + end = pd.to_datetime( lastrow.split(',')[1], infer_datetime_format=True).value // 10 ** 9 + + except Exception as e: + log.error('Error opening file: %s' % csv_fn) + log.exception(e) + + ''' + Poloniex API limits querying TradeHistory to intervals smaller than 1 year, + so we make sure that start date is never more than 1 year apart from end date + ''' + if( end == 9999999999 and time.time() - start > 365*86400 ): + newstart = time.time() - 360*86400 + elif( end != 9999999999 and end - start > 365*86400 ): + newstart = end - 360*86400 + else: + newstart = start + + log.debug(currencyPair+': Retrieving from '+str(newstart)+' to '+str(end)) + + url = self._api_path + 'command=returnTradeHistory¤cyPair=' + currencyPair + '&start=' + str(newstart) + '&end=' + str(end) + + try: + response = requests.get(url) + except Exception as e: + log.error('Failed to retrieve trade history data for %s' % currencyPair) + log.exception(e) + return None + + if('last_tradeID' in locals() and response.json()[-1]['tradeID'] == last_tradeID): # Got to the end of TradingHistory for this coin + return + + try: + with open(csv_fn, 'ab') as csvfile: + csvwriter = csv.writer(csvfile) + for item in response.json(): + if( 'last_tradeID' in locals() and item['tradeID'] >= last_tradeID ): + continue + csvwriter.writerow([ + item['tradeID'], + item['date'], + item['type'], + item['rate'], + item['amount'], + item['total'], + item['globalTradeID'] + ]) + except Exception as e: + log.error('Error opening %s' % csv_fn) + log.exception(e) + + end = pd.to_datetime( response.json()[-1]['date'], infer_datetime_format=True).value // 10 ** 9 + + self.retrieve_trade_history(currencyPair, start, end) # If we get here, we aren't done. Repeat + + def write_ohlcv_file(self, currencyPair): + + csv_trades = CSV_OUT_FOLDER + 'crypto_trades-' + currencyPair + '.csv' + csv_1min = CSV_OUT_FOLDER + 'crypto_1min-' + currencyPair + '.csv' + if( os.path.isfile(csv_1min) ): + log.debug(currencyPair+': 1min data already present. Delete the file if you want to rebuild it.') + else: + df = pd.read_csv(csv_trades, names=['tradeID','date','type','rate','amount','total','globalTradeID'] ) + df.drop(['tradeID','type','amount','globalTradeID'], axis=1, inplace=True) + df['date'] = pd.to_datetime(df['date'], infer_datetime_format=True) + ohlcv = self.generate_ohlcv(df) + try: + with open(csv_1min, 'ab') as csvfile: + csvwriter = csv.writer(csvfile) + for item in ohlcv.itertuples(): + if item.Index == 0: + continue + csvwriter.writerow([ + item.Index.value // 10 ** 9, + item.open, + item.high, + item.low, + item.close, + item.volume, + ]) + except Exception as e: + log.error('Error opening %s' % csv_fn) + log.exception(e) + log.debug(currencyPair+': Generated 1min OHLCV data.') def generate_ohlcv(self, df): @@ -171,10 +268,10 @@ class PoloniexCurator(object): for currencyPair in self.currency_pairs: self.append_data_single_pair(currencyPair) # Rate limit is 6 calls per second, sleep 1sec/6 to be safe - time.sleep(0.17) + #time.sleep(0.17) ''' - Returns a data frame for all pairs, or for the requests currency pair. + Returns a data frame for all pairs, or for the requested currency pair. Makes sure data is up to date ''' def to_dataframe(self, start, end, currencyPair=None): @@ -193,5 +290,9 @@ class PoloniexCurator(object): if __name__ == '__main__': pc = PoloniexCurator() - pc.get_currency_pairs() - pc.append_data() + #pc.get_currency_pairs() + #pc.append_data() + + for coin in COINS: + # pc.retrieve_trade_history(coin,DT_START) + pc.write_ohlcv_file(coin) From 91e71c5e380c2d270b7428849d695185bd594237 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Wed, 20 Sep 2017 09:15:39 -0600 Subject: [PATCH 10/29] WIP: bundling 1min data --- catalyst/curate/poloniex.py | 113 ++++++++++++++++++++++++++---------- 1 file changed, 83 insertions(+), 30 deletions(-) diff --git a/catalyst/curate/poloniex.py b/catalyst/curate/poloniex.py index 33eb46c9..d63be3f9 100644 --- a/catalyst/curate/poloniex.py +++ b/catalyst/curate/poloniex.py @@ -1,15 +1,15 @@ import json, time, csv from datetime import datetime import pandas as pd -import os -import time -import requests -import logbook +import os, time, shutil, requests, logbook -DT_START = time.mktime(datetime(2010, 1, 1, 0, 0).timetuple()) +DT_START = int(time.mktime(datetime(2010, 1, 1, 0, 0).timetuple())) +DT_END = int(time.time()) CSV_OUT_FOLDER = '/var/tmp/catalyst/data/poloniex/' +CSV_OUT_FOLDER = '/Volumes/enigma/data/poloniex/' CONN_RETRIES = 2 COINS = ['USDT_BTC','USDT_DASH','USDT_ETC','USDT_ETH','USDT_LTC','USDT_NXT','USDT_REP','USDT_STR','USDT_XMR','USDT_XRP','USDT_ZEC'] +COINS = ['USDT_BTC',] logbook.StderrHandler().push_application() @@ -116,36 +116,44 @@ class PoloniexCurator(object): df = self.get_data( currencyPair, start, first, df ) return df - def retrieve_trade_history(self, currencyPair, start, end=9999999999): + def _retrieve_tradeID_date(self, row): + tId = int(row.split(',')[0]) + d = pd.to_datetime( row.split(',')[1], infer_datetime_format=True).value // 10 ** 9 + return tId, d + + + def retrieve_trade_history(self, currencyPair, start=DT_START, end=DT_END, temp=None): csv_fn = CSV_OUT_FOLDER + 'crypto_trades-' + currencyPair + '.csv' try: with open(csv_fn, 'ab+') as f: f.seek(0, os.SEEK_END) - if(f.tell() > 2): # First check file is not zero size + if(f.tell() > 2): # First check file is not zero size + f.seek(0) # Go to the beginning to read first line + last_tradeID, end_file = self._retrieve_tradeID_date(f.readline()) f.seek(-2, os.SEEK_END) # Jump to the second last byte. while f.read(1) != b"\n": # Until EOL is found... f.seek(-2, os.SEEK_CUR) # ...jump back the read byte plus one more. - lastrow = f.readline() # read last line - last_tradeID = int(lastrow.split(',')[0]) - end = pd.to_datetime( lastrow.split(',')[1], infer_datetime_format=True).value // 10 ** 9 + first_tradeID, start_file = self._retrieve_tradeID_date(f.readline()) + + if( first_tradeID == 1 and end_file + 3600 > DT_END ): + return except Exception as e: log.error('Error opening file: %s' % csv_fn) log.exception(e) ''' - Poloniex API limits querying TradeHistory to intervals smaller than 1 year, - so we make sure that start date is never more than 1 year apart from end date + Poloniex API limits querying TradeHistory to intervals smaller than 1 month, + so we make sure that start date is never more than 1 month apart from end date ''' - if( end == 9999999999 and time.time() - start > 365*86400 ): - newstart = time.time() - 360*86400 - elif( end != 9999999999 and end - start > 365*86400 ): - newstart = end - 360*86400 + if( end - start > 2419200 ): # 60 s/min * 60 min/hr * 24 hr/day * 28 days + newstart = end - 2419200 else: newstart = start - log.debug(currencyPair+': Retrieving from '+str(newstart)+' to '+str(end)) + log.debug(currencyPair+': Retrieving from '+str(newstart)+' to '+str(end) +'\t ' + + time.ctime(newstart) + ' - '+ time.ctime(end)) url = self._api_path + 'command=returnTradeHistory¤cyPair=' + currencyPair + '&start=' + str(newstart) + '&end=' + str(end) @@ -155,31 +163,63 @@ class PoloniexCurator(object): log.error('Failed to retrieve trade history data for %s' % currencyPair) log.exception(e) return None + else: + if isinstance(response.json(), dict) and response.json()['error']: + log.error('Failed to to retrieve trade history data for %s: %s' % (currencyPair,response.json()['error'])) + exit(1) - if('last_tradeID' in locals() and response.json()[-1]['tradeID'] == last_tradeID): # Got to the end of TradingHistory for this coin + if('first_tradeID' in locals() and response.json()[-1]['tradeID'] == first_tradeID): # Got to the end of TradingHistory for this coin return try: - with open(csv_fn, 'ab') as csvfile: - csvwriter = csv.writer(csvfile) + if( 'end_file' in locals() and end_file + 3600 < end): + if (temp is None): + temp = os.tmpfile() + tempcsv = csv.writer(temp) for item in response.json(): - if( 'last_tradeID' in locals() and item['tradeID'] >= last_tradeID ): + if( item['tradeID'] <= last_tradeID ): continue - csvwriter.writerow([ + tempcsv.writerow([ item['tradeID'], item['date'], item['type'], item['rate'], item['amount'], item['total'], - item['globalTradeID'] + item['globalTradeID'] ]) + if( response.json()[-1]['tradeID'] > last_tradeID ): + end = pd.to_datetime( response.json()[-1]['date'], infer_datetime_format=True).value // 10 ** 9 + self.retrieve_trade_history(currencyPair, start, end, temp=temp) + else: + with open(csv_fn,'rb+') as f: + shutil.copyfileobj(f,temp) + f.seek(0) + temp.seek(0) + shutil.copyfileobj(temp,f) + temp.close() + end = start_file + else: + with open(csv_fn, 'ab') as csvfile: + csvwriter = csv.writer(csvfile) + for item in response.json(): + if( 'first_tradeID' in locals() and item['tradeID'] >= first_tradeID ): + continue + csvwriter.writerow([ + item['tradeID'], + item['date'], + item['type'], + item['rate'], + item['amount'], + item['total'], + item['globalTradeID'] + ]) + end = pd.to_datetime( response.json()[-1]['date'], infer_datetime_format=True).value // 10 ** 9 + except Exception as e: log.error('Error opening %s' % csv_fn) log.exception(e) - end = pd.to_datetime( response.json()[-1]['date'], infer_datetime_format=True).value // 10 ** 9 - self.retrieve_trade_history(currencyPair, start, end) # If we get here, we aren't done. Repeat def write_ohlcv_file(self, currencyPair): @@ -189,7 +229,8 @@ class PoloniexCurator(object): if( os.path.isfile(csv_1min) ): log.debug(currencyPair+': 1min data already present. Delete the file if you want to rebuild it.') else: - df = pd.read_csv(csv_trades, names=['tradeID','date','type','rate','amount','total','globalTradeID'] ) + df = pd.read_csv(csv_trades, names=['tradeID','date','type','rate','amount','total','globalTradeID'], + dtype = {'tradeID': int, 'date': str, 'type': str, 'rate': float, 'amount': float, 'total': float, 'globalTradeID': int } ) df.drop(['tradeID','type','amount','globalTradeID'], axis=1, inplace=True) df['date'] = pd.to_datetime(df['date'], infer_datetime_format=True) ohlcv = self.generate_ohlcv(df) @@ -288,11 +329,23 @@ class PoloniexCurator(object): return df[datetime.fromtimestamp(start):datetime.fromtimestamp(end-1)] + def onemin_to_dataframe(self, currencyPair, start, end): + csv_fn = CSV_OUT_FOLDER + 'crypto_1min-' + currencyPair + '.csv' + df = pd.read_csv(csv_fn, names=['date', 'open', 'high', 'low', 'close', 'volume']) + df['date'] = pd.to_datetime(df['date'],unit='s') + df.set_index('date', inplace=True) + return df[start : end] + if __name__ == '__main__': pc = PoloniexCurator() - #pc.get_currency_pairs() + pc.get_currency_pairs() #pc.append_data() - for coin in COINS: - # pc.retrieve_trade_history(coin,DT_START) - pc.write_ohlcv_file(coin) + #for coin in COINS: + for currencyPair in pc.currency_pairs: + #csv_1min = CSV_OUT_FOLDER + 'crypto_1min-' + currencyPair + '.csv' + #if( os.path.isfile(csv_1min) ): + # log.debug(currencyPair+': 1min data already present. Delete the file if you want to rebuild it.') + #else: + pc.retrieve_trade_history(currencyPair) + pc.write_ohlcv_file(currencyPair) From 36c2564bb0769b122ea6de2c45ce0462830d02a1 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Wed, 20 Sep 2017 11:05:09 -0600 Subject: [PATCH 11/29] splitting plot styles: dark for live, default for backtesting --- catalyst/exchange/live_graph_clock.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/catalyst/exchange/live_graph_clock.py b/catalyst/exchange/live_graph_clock.py index 877f8115..60e7503b 100644 --- a/catalyst/exchange/live_graph_clock.py +++ b/catalyst/exchange/live_graph_clock.py @@ -24,8 +24,6 @@ from matplotlib import style log = Logger('LiveGraphClock') -style.use('dark_background') - fmt = mdates.DateFormatter('%Y-%m-%d %H:%M') @@ -63,6 +61,8 @@ class LiveGraphClock(object): self._before_trading_start_bar_yielded = True self.context = context + style.use('dark_background') + fig = plt.figure() fig.canvas.set_window_title('Enigma Catalyst: {}'.format( self.context.algo_namespace)) From 05a69cfc920638c7c0472b4af1c50f6cbca155df Mon Sep 17 00:00:00 2001 From: Abner Ayala-Acevedo Date: Wed, 20 Sep 2017 10:16:29 -0700 Subject: [PATCH 12/29] Added environment.yml for simpler conda installation Simpler conda installation by using environment.yml requirements. `conda env create -f python2.7-environment.yml` `Linux or Mac: source activate catalyst` `Windows: activate catalyst` --- etc/python2.7-environment.yml | 58 +++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 etc/python2.7-environment.yml diff --git a/etc/python2.7-environment.yml b/etc/python2.7-environment.yml new file mode 100644 index 00000000..557bc656 --- /dev/null +++ b/etc/python2.7-environment.yml @@ -0,0 +1,58 @@ +name: catalyst +channels: +- defaults +dependencies: +- certifi=2016.2.28=py27_0 +- openssl=1.0.2l=0 +- pip=9.0.1=py27_1 +- python=2.7.13=0 +- readline=6.2=2 +- setuptools=36.4.0=py27_0 +- sqlite=3.13.0=0 +- tk=8.5.18=0 +- wheel=0.29.0=py27_0 +- zlib=1.2.11=0 +- pip: + - alembic==0.9.5 + - bcolz==0.12.1 + - bottleneck==1.2.1 + - chardet==3.0.4 + - click==6.7 + - contextlib2==0.5.5 + - cycler==0.10.0 + - cyordereddict==1.0.0 + - cython==0.26.1 + - decorator==4.1.2 + - empyrical==0.2.1 + - enigma-catalyst==0.2.dev2 + - functools32==3.2.3.post2 + - idna==2.6 + - intervaltree==2.1.0 + - logbook==1.1.0 + - lru-dict==1.1.6 + - mako==1.0.7 + - markupsafe==1.0 + - matplotlib==2.0.2 + - multipledispatch==0.4.9 + - networkx==1.11 + - numexpr==2.6.4 + - numpy==1.13.1 + - pandas==0.19.2 + - pandas-datareader==0.5.0 + - patsy==0.4.1 + - pyparsing==2.2.0 + - python-dateutil==2.6.1 + - python-editor==1.0.3 + - pytz==2017.2 + - requests==2.18.4 + - requests-file==1.4.2 + - requests-ftp==0.3.1 + - scipy==0.19.1 + - six==1.11.0 + - sortedcontainers==1.5.7 + - sqlalchemy==1.1.14 + - statsmodels==0.8.0 + - subprocess32==3.2.7 + - tables==3.4.2 + - toolz==0.8.2 + - urllib3==1.22 From 06f48cf15819480e98ebec8bdd3a0ee93f548ae5 Mon Sep 17 00:00:00 2001 From: Abner Ayala-Acevedo Date: Wed, 20 Sep 2017 10:41:42 -0700 Subject: [PATCH 13/29] Update to include python-dev --- etc/python2.7-environment.yml | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/etc/python2.7-environment.yml b/etc/python2.7-environment.yml index 557bc656..b31a1585 100644 --- a/etc/python2.7-environment.yml +++ b/etc/python2.7-environment.yml @@ -1,19 +1,30 @@ name: catalyst channels: +- statiskit - defaults dependencies: - certifi=2016.2.28=py27_0 +- coverage=4.4.1=py27_0 +- nose=1.3.7=py27_1 - openssl=1.0.2l=0 +- path.py=10.3.1=py27_0 - pip=9.0.1=py27_1 - python=2.7.13=0 +- pyyaml=3.12=py27_0 - readline=6.2=2 - setuptools=36.4.0=py27_0 +- six=1.10.0=py27_0 - sqlite=3.13.0=0 - tk=8.5.18=0 - wheel=0.29.0=py27_0 +- yaml=0.1.6=0 - zlib=1.2.11=0 +- libdev=1.0.0=py27_0 +- python-dev=1.0.0=py27_0 +- python-scons=3.0.0=py27_0 - pip: - alembic==0.9.5 + - backports.shutil-get-terminal-size==1.0.0 - bcolz==0.12.1 - bottleneck==1.2.1 - chardet==3.0.4 @@ -25,9 +36,14 @@ dependencies: - decorator==4.1.2 - empyrical==0.2.1 - enigma-catalyst==0.2.dev2 + - enum34==1.1.6 - functools32==3.2.3.post2 - idna==2.6 - intervaltree==2.1.0 + - ipdb==0.10.3 + - ipdbplugin==1.4.5 + - ipython==5.5.0 + - ipython-genutils==0.2.0 - logbook==1.1.0 - lru-dict==1.1.6 - mako==1.0.7 @@ -39,7 +55,13 @@ dependencies: - numpy==1.13.1 - pandas==0.19.2 - pandas-datareader==0.5.0 + - pathlib2==2.3.0 - patsy==0.4.1 + - pexpect==4.2.1 + - pickleshare==0.7.4 + - prompt-toolkit==1.0.15 + - ptyprocess==0.5.2 + - pygments==2.2.0 - pyparsing==2.2.0 - python-dateutil==2.6.1 - python-editor==1.0.3 @@ -47,12 +69,16 @@ dependencies: - requests==2.18.4 - requests-file==1.4.2 - requests-ftp==0.3.1 + - scandir==1.5 - scipy==0.19.1 - - six==1.11.0 + - scons==3.0.0a20170821 + - simplegeneric==0.8.1 - sortedcontainers==1.5.7 - sqlalchemy==1.1.14 - statsmodels==0.8.0 - subprocess32==3.2.7 - tables==3.4.2 - toolz==0.8.2 + - traitlets==4.3.2 - urllib3==1.22 + - wcwidth==0.1.7 From 81bd2d84f037fb793ae0747d21a00ecec2bb5902 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Wed, 20 Sep 2017 12:47:31 -0600 Subject: [PATCH 14/29] >=0.2.dev2 for catalyst, since we're in active dev and will change periodically --- etc/python2.7-environment.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etc/python2.7-environment.yml b/etc/python2.7-environment.yml index b31a1585..34fabcbb 100644 --- a/etc/python2.7-environment.yml +++ b/etc/python2.7-environment.yml @@ -35,7 +35,7 @@ dependencies: - cython==0.26.1 - decorator==4.1.2 - empyrical==0.2.1 - - enigma-catalyst==0.2.dev2 + - enigma-catalyst>=0.2.dev2 - enum34==1.1.6 - functools32==3.2.3.post2 - idna==2.6 From a1bc174740bc629aeb43a984583cea09771e2615 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Wed, 20 Sep 2017 15:11:18 -0600 Subject: [PATCH 15/29] Wrapping up 1min data for Poloniex in backtesting --- catalyst/curate/poloniex.py | 218 +++++++++--------------------- catalyst/data/bundles/poloniex.py | 32 +++-- 2 files changed, 82 insertions(+), 168 deletions(-) diff --git a/catalyst/curate/poloniex.py b/catalyst/curate/poloniex.py index d63be3f9..911ac25f 100644 --- a/catalyst/curate/poloniex.py +++ b/catalyst/curate/poloniex.py @@ -6,19 +6,15 @@ import os, time, shutil, requests, logbook DT_START = int(time.mktime(datetime(2010, 1, 1, 0, 0).timetuple())) DT_END = int(time.time()) CSV_OUT_FOLDER = '/var/tmp/catalyst/data/poloniex/' -CSV_OUT_FOLDER = '/Volumes/enigma/data/poloniex/' CONN_RETRIES = 2 -COINS = ['USDT_BTC','USDT_DASH','USDT_ETC','USDT_ETH','USDT_LTC','USDT_NXT','USDT_REP','USDT_STR','USDT_XMR','USDT_XRP','USDT_ZEC'] -COINS = ['USDT_BTC',] - logbook.StderrHandler().push_application() log = logbook.Logger(__name__) class PoloniexCurator(object): - """ + ''' OHLCV data feed generator for crypto data. Based on Poloniex market data - """ + ''' _api_path = 'https://poloniex.com/public?' currency_pairs = [] @@ -31,6 +27,9 @@ class PoloniexCurator(object): log.error('Failed to create data folder: %s' % CSV_OUT_FOLDER) log.exception(e) + ''' + Retrieves and returns all currency pairs from the exchange + ''' def get_currency_pairs(self): url = self._api_path + 'command=returnTicker' @@ -49,82 +48,29 @@ class PoloniexCurator(object): log.debug('Currency pairs retrieved successfully: %d' % (len(self.currency_pairs))) - def _get_start_date(self, csv_fn): - ''' Function returns latest appended date, if the file has been previously written - the last line is an empty one, so we have to read the second to last line - ''' - try: - with open(csv_fn, 'ab+') as f: - f.seek(0, os.SEEK_END) # First check file is not zero size - if(f.tell() > 2): - f.seek(-2, os.SEEK_END) # Jump to the second last byte. - while f.read(1) != b"\n": # Until EOL is found... - f.seek(-2, os.SEEK_CUR) # ...jump back the read byte plus one more. - lastrow = f.readline() - return int(lastrow.split(',')[0]) + 300 - - except Exception as e: - log.error('Error opening file: %s' % csv_fn) - log.exception(e) - - return DT_START - - def get_data(self, currencyPair, start, end=9999999999, prev_df=None): - log.debug(currencyPair+': Retrieving from '+str(start)+' to '+str(end)) - - ''' - Poloniex limits a single query to returnTradeHistory to less than a year between start and end - ''' - if(end == 9999999999 and time.time() - start > 365*86400 ): - newstart = time.time() - 360*86400 - elif( end != 9999999999 and end - start > 365*86400 ): - newstart = end - 360*86400 - else: - newstart = start - - url = self._api_path + 'command=returnTradeHistory¤cyPair=' + currencyPair + '&start=' + str(newstart) + '&end=' + str(end) - - try: - response = requests.get(url) - except Exception as e: - log.error('Failed to retrieve trade history data for %s' % currencyPair) - log.exception(e) - return None - - log.debug(currencyPair+': Received '+str(len(response.json()))+' trades.') - if(len(response.json())==1 and not isinstance(response.json(),list)): - r = response.json() - print(r) - if(r['error']): - log.error(r['error']) - return None - - df = pd.DataFrame(data=response.json(), columns = ['date','rate', 'total', 'tradeID']) - df['rate'] = pd.to_numeric( df['rate'], errors='coerce') # Convert rate to float - df['total'] = pd.to_numeric( df['total'], errors='coerce') # Convert vol to float - df['tradeID'] = pd.to_numeric( df['tradeID'], errors='coerce') # Convert vol to float - df['date'] = pd.to_datetime(df['date'], infer_datetime_format=True) # Convert date - df.set_index('tradeID', inplace=True) # Index by tradeID - df = df.iloc[::-1] # Reverse timeseries as TradeHistory is provided newest to oldest - - if(prev_df is not None): - if(prev_df.index[0] == df.index[0]): - return prev_df - df = prev_df.combine_first(df) - - first = df['date'].iloc[0].value // 10 ** 9 - df = self.get_data( currencyPair, start, first, df ) - return df + ''' + Helper function that reads tradeID and date fields from CSV readline + ''' def _retrieve_tradeID_date(self, row): tId = int(row.split(',')[0]) d = pd.to_datetime( row.split(',')[1], infer_datetime_format=True).value // 10 ** 9 return tId, d - + ''' + Retrieves TradeHistory from exchange for a given currencyPair between start and end dates. + If no start date is provided, uses a system-wide one (beginning of time for cryptotrading) + If no end date is provided, 'now' is used + Stores results in CSV file on disk. + This function is called recursively to work around the limitations imposed by the provider API. + ''' def retrieve_trade_history(self, currencyPair, start=DT_START, end=DT_END, temp=None): csv_fn = CSV_OUT_FOLDER + 'crypto_trades-' + currencyPair + '.csv' + ''' + Check what data we already have on disk, reading first and last lines from file. + Data is stored on file from NEWEST to OLDEST. + ''' try: with open(csv_fn, 'ab+') as f: f.seek(0, os.SEEK_END) @@ -168,9 +114,22 @@ class PoloniexCurator(object): log.error('Failed to to retrieve trade history data for %s: %s' % (currencyPair,response.json()['error'])) exit(1) - if('first_tradeID' in locals() and response.json()[-1]['tradeID'] == first_tradeID): # Got to the end of TradingHistory for this coin + ''' + If we get to transactionId == 1, and we already have that on disk, + we got to the end of TradeHistory for this coin. + ''' + if('first_tradeID' in locals() and response.json()[-1]['tradeID'] == first_tradeID): return + ''' + There are primarily two scenarios: + a) There is newer data available that we need to add at the beginning + of the file. We'll retrieve all what we need until we get to what + we already have, writing it to a temporary file; and we will write + that at the beginning of our existing file. + b) We are going back in time, appending at the end of our existing + TradeHistory until the first transaction for this currencyPair + ''' try: if( 'end_file' in locals() and end_file + 3600 < end): if (temp is None): @@ -220,10 +179,34 @@ class PoloniexCurator(object): log.error('Error opening %s' % csv_fn) log.exception(e) - self.retrieve_trade_history(currencyPair, start, end) # If we get here, we aren't done. Repeat + ''' + If we got here, we aren't done yet. Call recursively with 'end' times + that go sequentially back in time. + ''' + self.retrieve_trade_history(currencyPair, start, end) - def write_ohlcv_file(self, currencyPair): - + + ''' + Generates OHLCV dataframe from a dataframe containing all TradeHistory + by resampling with 1-minute period + ''' + def generate_ohlcv(self, df): + df.set_index('date', inplace=True) # Index by date + vol = df['total'].to_frame('volume') # Will deal with vol separately, as ohlc() messes it up + df.drop('total', axis=1, inplace=True) # Drop volume data from dataframe + ohlc = df.resample('T').ohlc() # Resample OHLC in 1min bins + ohlc.columns = ohlc.columns.map(lambda t: t[1]) # Raname columns by dropping 'rate' + closes = ohlc['close'].fillna(method='pad') # Pad forward missing 'close' + ohlc = ohlc.apply(lambda x: x.fillna(closes)) # Fill N/A with last close + vol = vol.resample('T').sum().fillna(0) # Add volumes by bin + ohlcv = pd.concat([ohlc,vol], axis=1) # Concatenate OHLC + Volume + return ohlcv + + + ''' + Generates OHLCV data file with 1minute bars from TradeHistory on disk + ''' + def write_ohlcv_file(self, currencyPair): csv_trades = CSV_OUT_FOLDER + 'crypto_trades-' + currencyPair + '.csv' csv_1min = CSV_OUT_FOLDER + 'crypto_1min-' + currencyPair + '.csv' if( os.path.isfile(csv_1min) ): @@ -253,82 +236,10 @@ class PoloniexCurator(object): log.exception(e) log.debug(currencyPair+': Generated 1min OHLCV data.') - def generate_ohlcv(self, df): - - df.set_index('date', inplace=True) # Index by date - vol = df['total'].to_frame('volume') # Will deal with vol separately, as ohlc() messes it up - df.drop('total', axis=1, inplace=True) # Drop volume data from dataframe - ohlc = df.resample('T').ohlc() # Resample OHLC in 5min bins - ohlc.columns = ohlc.columns.map(lambda t: t[1]) # Raname columns by dropping 'rate' - closes = ohlc['close'].fillna(method='pad') # Pad forward missing 'close' - ohlc = ohlc.apply(lambda x: x.fillna(closes)) # Fill N/A with last close - vol = vol.resample('T').sum().fillna(0) # Add volumes by bin - ohlcv = pd.concat([ohlc,vol], axis=1) # Concatenate OHLC + Volume - - return ohlcv - - ''' - Pulls latest data for a single pair - ''' - def append_data_single_pair(self, currencyPair, repeat=0): - log.debug('Getting data for %s' % currencyPair) - csv_fn = CSV_OUT_FOLDER + 'crypto_prices-' + currencyPair + '.csv' - start = self._get_start_date(csv_fn) - # Only fetch data if more than 5min have passed since last fetch - if (time.time() > start): - data = self.get_data(currencyPair, start) - - if data is not None: - ohlcv = self.generate_ohlcv(data) - - try: - with open(csv_fn, 'ab') as csvfile: - csvwriter = csv.writer(csvfile) - for item in ohlcv.itertuples(): - if item.Index == 0: - continue - csvwriter.writerow([ - item.Index.value // 10 ** 9, - item.open, - item.high, - item.low, - item.close, - item.volume, - ]) - except Exception as e: - log.error('Error opening %s' % csv_fn) - log.exception(e) - elif (repeat < CONN_RETRIES): - log.debug('Retrying: attemt %d' % (repeat+1) ) - self.append_data_single_pair(currencyPair, repeat + 1) ''' - Pulls latest data for all currency pairs + Returns a data frame for a given currencyPair from data on disk ''' - def append_data(self): - for currencyPair in self.currency_pairs: - self.append_data_single_pair(currencyPair) - # Rate limit is 6 calls per second, sleep 1sec/6 to be safe - #time.sleep(0.17) - - ''' - Returns a data frame for all pairs, or for the requested currency pair. - Makes sure data is up to date - ''' - def to_dataframe(self, start, end, currencyPair=None): - csv_fn = CSV_OUT_FOLDER + 'crypto_prices-' + currencyPair + '.csv' - last_date = self._get_start_date(csv_fn) - if last_date + 300 < end or not os.path.exists(csv_fn): - # get latest data - self.append_data_single_pair(currencyPair) - - # CSV holds the latest snapshot - df = pd.read_csv(csv_fn, names=['date', 'open', 'high', 'low', 'close', 'volume']) - df['date']=pd.to_datetime(df['date'],unit='s') - df.set_index('date', inplace=True) - - return df[datetime.fromtimestamp(start):datetime.fromtimestamp(end-1)] - def onemin_to_dataframe(self, currencyPair, start, end): csv_fn = CSV_OUT_FOLDER + 'crypto_1min-' + currencyPair + '.csv' df = pd.read_csv(csv_fn, names=['date', 'open', 'high', 'low', 'close', 'volume']) @@ -336,16 +247,11 @@ class PoloniexCurator(object): df.set_index('date', inplace=True) return df[start : end] + if __name__ == '__main__': pc = PoloniexCurator() pc.get_currency_pairs() - #pc.append_data() - #for coin in COINS: for currencyPair in pc.currency_pairs: - #csv_1min = CSV_OUT_FOLDER + 'crypto_1min-' + currencyPair + '.csv' - #if( os.path.isfile(csv_1min) ): - # log.debug(currencyPair+': 1min data already present. Delete the file if you want to rebuild it.') - #else: pc.retrieve_trade_history(currencyPair) pc.write_ohlcv_file(currencyPair) diff --git a/catalyst/data/bundles/poloniex.py b/catalyst/data/bundles/poloniex.py index 9ad60d01..60bbac95 100644 --- a/catalyst/data/bundles/poloniex.py +++ b/catalyst/data/bundles/poloniex.py @@ -25,6 +25,8 @@ from catalyst.data.bundles.core import register_bundle from catalyst.data.bundles.base_pricing import BaseCryptoPricingBundle from catalyst.utils.memoize import lazyval +from catalyst.curate.poloniex import PoloniexCurator + class PoloniexBundle(BaseCryptoPricingBundle): @lazyval def name(self): @@ -38,7 +40,7 @@ class PoloniexBundle(BaseCryptoPricingBundle): def frequencies(self): return set(( 'daily', - #'5-minute', + 'minute', )) @lazyval @@ -94,17 +96,23 @@ class PoloniexBundle(BaseCryptoPricingBundle): start_date, end_date, frequency): - raw = pd.read_json( - self._format_data_url( - api_key, - symbol, - start_date, - end_date, - frequency, - ), - orient='records', - ) - raw.set_index('date', inplace=True) + + if(frequency == 'minute'): + pc = PoloniexCurator() + raw = pc.onemin_to_dataframe(symbol, start_date, end_date) + + else: + raw = pd.read_json( + self._format_data_url( + api_key, + symbol, + start_date, + end_date, + frequency, + ), + orient='records', + ) + raw.set_index('date', inplace=True) # BcolzDailyBarReader introduces a 1/1000 factor in the way pricing is stored # on disk, which we compensate here to get the right pricing amounts From 7359cdc48fcf3fa170f2486bb3075c1c17478b3d Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Wed, 20 Sep 2017 16:20:57 -0600 Subject: [PATCH 16/29] fix data.history error with tz-aware dataframe --- catalyst/data/resample.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/catalyst/data/resample.py b/catalyst/data/resample.py index 3154e590..dc325c39 100644 --- a/catalyst/data/resample.py +++ b/catalyst/data/resample.py @@ -156,7 +156,10 @@ class DailyHistoryAggregator(object): cache = self._caches[field] = (session, market_open, {}) _, market_open, entries = cache - market_open = market_open.tz_localize('UTC') + try: + market_open = market_open.tz_localize('UTC') + except TypeError: + market_open = market_open.tz_convert('UTC') if dt != market_open: prev_dt = dt_value - self._one_min else: From 1f56325895674994c84531589b27bf2415926684 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Wed, 20 Sep 2017 23:37:55 -0600 Subject: [PATCH 17/29] fix price resolution in 1-minute data bundle: 8 decimal places --- catalyst/data/bundles/base.py | 2 +- catalyst/data/minute_bars.py | 70 +++++++++++++++++------------------ 2 files changed, 36 insertions(+), 36 deletions(-) diff --git a/catalyst/data/bundles/base.py b/catalyst/data/bundles/base.py index 23640abd..135dd531 100644 --- a/catalyst/data/bundles/base.py +++ b/catalyst/data/bundles/base.py @@ -491,7 +491,7 @@ class BaseBundle(object): data_frequency, ) raw_data.index = pd.to_datetime(raw_data.index, utc=True) - raw_data.index = raw_data.index.tz_localize('UTC') + #raw_data.index = raw_data.index.tz_localize('UTC') # Filter incoming data to fit start and end sessions. raw_data = raw_data[ diff --git a/catalyst/data/minute_bars.py b/catalyst/data/minute_bars.py index d2707122..bcbb64ae 100644 --- a/catalyst/data/minute_bars.py +++ b/catalyst/data/minute_bars.py @@ -39,7 +39,7 @@ from catalyst.data._minute_bar_internal import ( from catalyst.gens.sim_engine import NANOS_IN_MINUTE from catalyst.data.bar_reader import BarReader, NoDataOnDate -from catalyst.data.us_equity_pricing import check_uint32_safe +from catalyst.data.us_equity_pricing import check_uint64_safe from catalyst.utils.calendars import get_calendar from catalyst.utils.cli import maybe_show_progress from catalyst.utils.memoize import lazyval @@ -52,7 +52,7 @@ FUTURES_MINUTES_PER_DAY = 1440 DEFAULT_EXPECTEDLEN = US_EQUITIES_MINUTES_PER_DAY * 252 * 15 -OHLC_RATIO = 1000 +OHLC_RATIO = 100000000 class BcolzMinuteOverlappingData(Exception): @@ -114,15 +114,15 @@ def _sid_subdir_path(sid): def convert_cols(cols, scale_factor, sid, invalid_data_behavior): - """Adapt OHLCV columns into uint32 columns. + """Adapt OHLCV columns into uint64 columns. Parameters ---------- cols : dict A dict mapping each column name (open, high, low, close, volume) - to a float column to convert to uint32. + to a float column to convert to uint64. scale_factor : int - Factor to use to scale float values before converting to uint32. + Factor to use to scale float values before converting to uint64. sid : int Sid of the relevant asset, for logging. invalid_data_behavior : str @@ -135,6 +135,7 @@ def convert_cols(cols, scale_factor, sid, invalid_data_behavior): scaled_highs = np.nan_to_num(cols['high']) * scale_factor scaled_lows = np.nan_to_num(cols['low']) * scale_factor scaled_closes = np.nan_to_num(cols['close']) * scale_factor + scaled_volumes = np.nan_to_num(cols['volume']) * scale_factor exclude_mask = np.zeros_like(scaled_opens, dtype=bool) @@ -143,11 +144,12 @@ def convert_cols(cols, scale_factor, sid, invalid_data_behavior): ('high', scaled_highs), ('low', scaled_lows), ('close', scaled_closes), + ('volume', scaled_volumes), ]: max_val = scaled_col.max() try: - check_uint32_safe(max_val, col_name) + check_uint64_safe(max_val, col_name) except ValueError: if invalid_data_behavior == 'raise': raise @@ -155,20 +157,20 @@ def convert_cols(cols, scale_factor, sid, invalid_data_behavior): if invalid_data_behavior == 'warn': logger.warn( 'Values for sid={}, col={} contain some too large for ' - 'uint32 (max={}), filtering them out', + 'uint64 (max={}), filtering them out', sid, col_name, max_val, ) # We want to exclude all rows that have an unsafe value in # this column. - exclude_mask &= (scaled_col >= np.iinfo(np.uint32).max) + exclude_mask &= (scaled_col >= np.iinfo(np.uint64).max) # Convert all cols to uint32. - opens = scaled_opens.astype(np.uint32) - highs = scaled_highs.astype(np.uint32) - lows = scaled_lows.astype(np.uint32) - closes = scaled_closes.astype(np.uint32) - volumes = cols['volume'].astype(np.uint32) + opens = scaled_opens.astype(np.uint64) + highs = scaled_highs.astype(np.uint64) + lows = scaled_lows.astype(np.uint64) + closes = scaled_closes.astype(np.uint64) + volumes = scaled_volumes.astype(np.uint64) # Exclude rows with unsafe values by setting to zero. opens[exclude_mask] = 0 @@ -288,7 +290,7 @@ class BcolzMinuteBarMetadata(object): ohlc_ratio : int The default ratio by which to multiply the pricing data to convert the floats from floats to an integer to fit within - the np.uint32. If ohlc_ratios_per_sid is None or does not + the np.uint64. If ohlc_ratios_per_sid is None or does not contain a mapping for a given sid, this ratio is used. ohlc_ratios_per_sid : dict A dict mapping each sid in the output to the factor by @@ -372,13 +374,13 @@ class BcolzMinuteBarWriter(object): The last trading session in the data set. default_ohlc_ratio : int, optional The default ratio by which to multiply the pricing data to - convert from floats to integers that fit within np.uint32. If + convert from floats to integers that fit within np.uint64. If ohlc_ratios_per_sid is None or does not contain a mapping for a - given sid, this ratio is used. Default is OHLC_RATIO (1000). + given sid, this ratio is used. Default is OHLC_RATIO (10^8). ohlc_ratios_per_sid : dict, optional A dict mapping each sid in the output to the ratio by which to multiply the pricing data to convert the floats from floats to - an integer to fit within the np.uint32. + an integer to fit within the np.uint64. expectedlen : int, optional The expected length of the dataset, used when creating the initial bcolz ctable. @@ -401,11 +403,9 @@ class BcolzMinuteBarWriter(object): Each individual asset's data is stored as a bcolz table with a column for each pricing field: (open, high, low, close, volume) - The open, high, low, and close columns are integers which are 1000 times + The open, high, low, close and volume columns are integers which are 10^8 times the quoted price, so that the data can represented and stored as an - np.uint32, supporting market prices quoted up to the thousands place. - - volume is a np.uint32 with no mutation of the tens place. + np.uint64, supporting market prices quoted up to the 1/10^8-th place. The 'index' for each individual asset are a repeating period of minutes of length `minutes_per_day` starting from each market open. @@ -573,7 +573,7 @@ class BcolzMinuteBarWriter(object): if not os.path.exists(sid_containing_dirname): # Other sids may have already created the containing directory. os.makedirs(sid_containing_dirname) - initial_array = np.empty(0, np.uint32) + initial_array = np.empty(0, np.uint64) table = ctable( rootdir=path, columns=[ @@ -610,7 +610,7 @@ class BcolzMinuteBarWriter(object): minute_offset = len(table) % self._minutes_per_day num_to_prepend = numdays * self._minutes_per_day - minute_offset - prepend_array = np.zeros(num_to_prepend, np.uint32) + prepend_array = np.zeros(num_to_prepend, np.uint64) # Fill all OHLCV with zeros. table.append([prepend_array] * 5) table.flush() @@ -815,11 +815,11 @@ class BcolzMinuteBarWriter(object): minutes_count = all_minutes_in_window.size - open_col = np.zeros(minutes_count, dtype=np.uint32) - high_col = np.zeros(minutes_count, dtype=np.uint32) - low_col = np.zeros(minutes_count, dtype=np.uint32) - close_col = np.zeros(minutes_count, dtype=np.uint32) - vol_col = np.zeros(minutes_count, dtype=np.uint32) + open_col = np.zeros(minutes_count, dtype=np.uint64) + high_col = np.zeros(minutes_count, dtype=np.uint64) + low_col = np.zeros(minutes_count, dtype=np.uint64) + close_col = np.zeros(minutes_count, dtype=np.uint64) + vol_col = np.zeros(minutes_count, dtype=np.uint64) dt_ixs = np.searchsorted(all_minutes_in_window.values, dts.astype('datetime64[ns]')) @@ -1125,8 +1125,8 @@ class BcolzMinuteBarReader(MinuteBarReader): else: return np.nan - if field != 'volume': - value *= self._ohlc_ratio_inverse_for_sid(sid) + #if field != 'volume': + value *= self._ohlc_ratio_inverse_for_sid(sid) return value def get_last_traded_dt(self, asset, dt): @@ -1248,7 +1248,7 @@ class BcolzMinuteBarReader(MinuteBarReader): if field != 'volume': out = np.full(shape, np.nan) else: - out = np.zeros(shape, dtype=np.uint32) + out = np.zeros(shape, dtype=np.uint64) for i, sid in enumerate(sids): carray = self._open_minute_file(field, sid) @@ -1262,11 +1262,11 @@ class BcolzMinuteBarReader(MinuteBarReader): where = values != 0 # first slice down to len(where) because we might not have # written data for all the minutes requested - if field != 'volume': - out[:len(where), i][where] = ( + #if field != 'volume': + out[:len(where), i][where] = ( values[where] * self._ohlc_ratio_inverse_for_sid(sid)) - else: - out[:len(where), i][where] = values[where] + #else: + # out[:len(where), i][where] = values[where] results.append(out) return results From df8ba90236c43a9b982884a24ba5c11f3c7f7d16 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Thu, 21 Sep 2017 12:43:29 -0600 Subject: [PATCH 18/29] {exchange}/symbols.json moved to AWS --- catalyst/exchange/exchange_utils.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/catalyst/exchange/exchange_utils.py b/catalyst/exchange/exchange_utils.py index 2638c1c9..1bf74a68 100644 --- a/catalyst/exchange/exchange_utils.py +++ b/catalyst/exchange/exchange_utils.py @@ -9,9 +9,8 @@ from catalyst.exchange.exchange_errors import ExchangeAuthNotFound, \ ExchangeSymbolsNotFound from catalyst.utils.paths import data_root, ensure_directory -# TODO: move to aws -SYMBOLS_URL = 'https://raw.githubusercontent.com/enigmampc/catalyst/' \ - 'master/catalyst/exchange/{exchange}/symbols.json' +SYMBOLS_URL = 'https://s3.amazonaws.com/enigmaco/catalyst-exchanges/' \ + '{exchange}/symbols.json' def get_exchange_folder(exchange_name, environ=None): From 8cabb33372cdaea8566bcc9926a756ff7d0ab758 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Fri, 22 Sep 2017 09:54:51 -0600 Subject: [PATCH 19/29] Autogeneration of symbols.json for bitfinex --- catalyst/exchange/bitfinex/bitfinex.py | 13 +++++++++++++ catalyst/exchange/exchange_utils.py | 10 ++++++---- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/catalyst/exchange/bitfinex/bitfinex.py b/catalyst/exchange/bitfinex/bitfinex.py index fe32991a..01ba45d2 100644 --- a/catalyst/exchange/bitfinex/bitfinex.py +++ b/catalyst/exchange/bitfinex/bitfinex.py @@ -23,6 +23,7 @@ from catalyst.exchange.exchange_execution import ExchangeLimitOrder, \ ExchangeStopLimitOrder, ExchangeStopOrder from catalyst.finance.order import Order, ORDER_STATUS from catalyst.protocol import Account +from catalyst.exchange.exchange_utils import get_exchange_symbols_filename # Trying to account for REST api instability # https://stackoverflow.com/questions/15431044/can-i-set-max-retries-for-requests-request @@ -527,3 +528,15 @@ class Bitfinex(Exchange): log.debug('got tickers {}'.format(ticks)) return ticks + + def generate_symbols_json(self, filename=None): + response = self._request('symbols', None) + symbols={} + for symbol in response.json(): + symbols[symbol]= {"symbol":symbol[:-3]+'_'+symbol[-3:], "start_date": "2010-01-01"} + + if(filename is None): + filename = get_exchange_symbols_filename(self.name) + + with open(filename,'w') as f: + json.dump(symbols, f, sort_keys=True, indent=2, separators=(',',':')) diff --git a/catalyst/exchange/exchange_utils.py b/catalyst/exchange/exchange_utils.py index 1bf74a68..ebb33023 100644 --- a/catalyst/exchange/exchange_utils.py +++ b/catalyst/exchange/exchange_utils.py @@ -24,18 +24,20 @@ def get_exchange_folder(exchange_name, environ=None): return exchange_folder -def download_exchange_symbols(exchange_name, environ=None): +def get_exchange_symbols_filename(exchange_name, environ=None): exchange_folder = get_exchange_folder(exchange_name, environ) - filename = os.path.join(exchange_folder, 'symbols.json') + return os.path.join(exchange_folder, 'symbols.json') + +def download_exchange_symbols(exchange_name, environ=None): + filename = get_exchange_symbols_filename(exchange_name) url = SYMBOLS_URL.format(exchange=exchange_name) response = urllib.urlretrieve(url=url, filename=filename) return response def get_exchange_symbols(exchange_name, environ=None): - exchange_folder = get_exchange_folder(exchange_name, environ) - filename = os.path.join(exchange_folder, 'symbols.json') + filename = get_exchange_symbols_filename(exchange_name) if not os.path.isfile(filename): download_exchange_symbols(exchange_name, environ) From daf3c4d285d12b6c9a09e23c88514395535a8e0c Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Fri, 22 Sep 2017 10:55:49 -0600 Subject: [PATCH 20/29] Autogeneration of symbols.json for bittrex --- catalyst/exchange/bitfinex/bitfinex.py | 6 ++-- catalyst/exchange/bittrex/bittrex.py | 47 ++++++++++++-------------- 2 files changed, 25 insertions(+), 28 deletions(-) diff --git a/catalyst/exchange/bitfinex/bitfinex.py b/catalyst/exchange/bitfinex/bitfinex.py index 01ba45d2..cb33c3ee 100644 --- a/catalyst/exchange/bitfinex/bitfinex.py +++ b/catalyst/exchange/bitfinex/bitfinex.py @@ -530,13 +530,13 @@ class Bitfinex(Exchange): return ticks def generate_symbols_json(self, filename=None): + symbol_map = {} response = self._request('symbols', None) - symbols={} for symbol in response.json(): - symbols[symbol]= {"symbol":symbol[:-3]+'_'+symbol[-3:], "start_date": "2010-01-01"} + symbol_map[symbol]= {"symbol":symbol[:-3]+'_'+symbol[-3:], "start_date": "2010-01-01"} if(filename is None): filename = get_exchange_symbols_filename(self.name) with open(filename,'w') as f: - json.dump(symbols, f, sort_keys=True, indent=2, separators=(',',':')) + json.dump(symbol_map, f, sort_keys=True, indent=2, separators=(',',':')) diff --git a/catalyst/exchange/bittrex/bittrex.py b/catalyst/exchange/bittrex/bittrex.py index f56073c5..6c0dfbc1 100644 --- a/catalyst/exchange/bittrex/bittrex.py +++ b/catalyst/exchange/bittrex/bittrex.py @@ -12,6 +12,8 @@ from catalyst.exchange.exchange_errors import InvalidHistoryFrequencyError, \ CreateOrderError from catalyst.finance.execution import LimitOrder, StopLimitOrder from catalyst.finance.order import Order, ORDER_STATUS +from catalyst.exchange.exchange_utils import get_exchange_symbols_filename + log = Logger('Bittrex') @@ -50,31 +52,6 @@ class Bittrex(Exchange): """ return exchange_symbol.lower() - def fetch_symbol_map(self): - """ - Since Bittrex gives us a complete dictionary of symbols, - we can build the symbol map ad-hoc as opposed to maintaining - a static file. We must be careful with mapping any unconventional - symbol name as appropriate. - - :return symbol_map: - """ - symbol_map = dict() - - markets = self.api.getmarkets() - for market in markets: - exchange_symbol = market['MarketName'] - symbol = '{market}_{base}'.format( - market=self.sanitize_curency_symbol(market['MarketCurrency']), - base=self.sanitize_curency_symbol(market['BaseCurrency']) - ) - symbol_map[exchange_symbol] = dict( - symbol=symbol, - start_date=pd.to_datetime(market['Created'], utc=True) - ) - - return symbol_map - def get_balances(self): try: log.debug('retrieving wallet balances') @@ -316,3 +293,23 @@ class Bittrex(Exchange): def get_account(self): log.info('retrieving account data') pass + + def generate_symbols_json(self, filename=None): + symbol_map = {} + markets = self.api.getmarkets() + for market in markets: + exchange_symbol = market['MarketName'] + symbol = '{market}_{base}'.format( + market=self.sanitize_curency_symbol(market['MarketCurrency']), + base=self.sanitize_curency_symbol(market['BaseCurrency']) + ) + symbol_map[exchange_symbol] = dict( + symbol=symbol, + start_date=pd.to_datetime(market['Created'], utc=True).strftime("%Y-%m-%d") + ) + + if(filename is None): + filename = get_exchange_symbols_filename(self.name) + + with open(filename,'w') as f: + json.dump(symbol_map, f, sort_keys=True, indent=2, separators=(',',':')) From 75c2753b98ee0136216a404d901e8c1132483691 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Fri, 22 Sep 2017 11:59:57 -0600 Subject: [PATCH 21/29] matplotlib imports inside init live_graph_clock --- catalyst/exchange/live_graph_clock.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/catalyst/exchange/live_graph_clock.py b/catalyst/exchange/live_graph_clock.py index 60e7503b..e84ee719 100644 --- a/catalyst/exchange/live_graph_clock.py +++ b/catalyst/exchange/live_graph_clock.py @@ -12,7 +12,6 @@ # limitations under the License. from datetime import timedelta -import matplotlib.dates as mdates import pandas as pd from catalyst.gens.sim_engine import ( BAR, @@ -24,8 +23,6 @@ from matplotlib import style log = Logger('LiveGraphClock') -fmt = mdates.DateFormatter('%Y-%m-%d %H:%M') - class LiveGraphClock(object): """Realtime clock for live trading. @@ -55,11 +52,17 @@ class LiveGraphClock(object): def __init__(self, sessions, context, time_skew=pd.Timedelta('0s')): + import matplotlib.dates as mdates + from matplotlib import pyplot as plt + from matplotlib import style + + self.sessions = sessions self.time_skew = time_skew self._last_emit = None self._before_trading_start_bar_yielded = True self.context = context + self.fmt = mdates.DateFormatter('%Y-%m-%d %H:%M') style.use('dark_background') @@ -97,7 +100,7 @@ class LiveGraphClock(object): :return: """ ax.xaxis.set_major_locator(mdates.DayLocator(interval=1)) - ax.xaxis.set_major_formatter(fmt) + ax.xaxis.set_major_formatter(self.fmt) locator = mdates.HourLocator(interval=4) locator.MAXTICKS = 5000 From 27f20a090a7bb61fc64fcbd68016397cc01a80fb Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Fri, 22 Sep 2017 12:03:17 -0600 Subject: [PATCH 22/29] matplotlib imports inside init live_graph_clock (2) --- catalyst/exchange/live_graph_clock.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/catalyst/exchange/live_graph_clock.py b/catalyst/exchange/live_graph_clock.py index e84ee719..59b2c3f4 100644 --- a/catalyst/exchange/live_graph_clock.py +++ b/catalyst/exchange/live_graph_clock.py @@ -18,8 +18,6 @@ from catalyst.gens.sim_engine import ( SESSION_START ) from logbook import Logger -from matplotlib import pyplot as plt -from matplotlib import style log = Logger('LiveGraphClock') From f60abcd6363f6425173abb99446b778ce7ab7429 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Mon, 25 Sep 2017 11:28:06 -0600 Subject: [PATCH 23/29] WIP: Poloniex exchange class --- catalyst/exchange/poloniex/__init__.py | 0 catalyst/exchange/poloniex/poloniex.py | 508 +++++++++++++++++++++ catalyst/exchange/poloniex/poloniex_api.py | 126 +++++ catalyst/utils/run_algo.py | 10 +- 4 files changed, 643 insertions(+), 1 deletion(-) create mode 100644 catalyst/exchange/poloniex/__init__.py create mode 100644 catalyst/exchange/poloniex/poloniex.py create mode 100644 catalyst/exchange/poloniex/poloniex_api.py diff --git a/catalyst/exchange/poloniex/__init__.py b/catalyst/exchange/poloniex/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/catalyst/exchange/poloniex/poloniex.py b/catalyst/exchange/poloniex/poloniex.py new file mode 100644 index 00000000..bf8069b8 --- /dev/null +++ b/catalyst/exchange/poloniex/poloniex.py @@ -0,0 +1,508 @@ +import base64 +import hashlib +import hmac +import json +import re +import time + +import numpy as np +import pandas as pd +import pytz +import requests +import six +from catalyst.assets._assets import TradingPair +from logbook import Logger + +from catalyst.exchange.poloniex.poloniex_api import Poloniex_api + + +# from websocket import create_connection +from catalyst.exchange.exchange import Exchange +from catalyst.exchange.exchange_errors import ( + ExchangeRequestError, + InvalidHistoryFrequencyError, + InvalidOrderStyle, OrderCancelError) +from catalyst.exchange.exchange_execution import ExchangeLimitOrder, \ + ExchangeStopLimitOrder, ExchangeStopOrder +from catalyst.finance.order import Order, ORDER_STATUS +from catalyst.protocol import Account +from catalyst.exchange.exchange_utils import get_exchange_symbols_filename + + +log = Logger('Poloniex') + + +class Poloniex(Exchange): + def __init__(self, key, secret, base_currency, portfolio=None): + self.api = Poloniex_api(key=key, secret=secret.encode('UTF-8')) + self.name = 'poloniex' + self.assets = {} + #self.load_assets() + self.base_currency = base_currency + self._portfolio = portfolio + self.minute_writer = None + self.minute_reader = None + + def sanitize_curency_symbol(self, exchange_symbol): + """ + Helper method used to build the universal pair. + Include any symbol mapping here if appropriate. + + :param exchange_symbol: + :return universal_symbol: + """ + return exchange_symbol.lower() + + ''' + def _create_order(self, order_status): + """ + Create a Catalyst order object from a Bitfinex order dictionary + :param order_status: + :return: Order + """ + if order_status['is_cancelled']: + status = ORDER_STATUS.CANCELLED + elif not order_status['is_live']: + log.info('found executed order {}'.format(order_status)) + status = ORDER_STATUS.FILLED + else: + status = ORDER_STATUS.OPEN + + amount = float(order_status['original_amount']) + filled = float(order_status['executed_amount']) + + if order_status['side'] == 'sell': + amount = -amount + filled = -filled + + price = float(order_status['price']) + order_type = order_status['type'] + + stop_price = None + limit_price = None + + # TODO: is this comprehensive enough? + if order_type.endswith('limit'): + limit_price = price + elif order_type.endswith('stop'): + stop_price = price + + executed_price = float(order_status['avg_execution_price']) + + # TODO: bitfinex does not specify comission. I could calculate it but not sure if it's worth it. + commission = None + + date = pd.Timestamp.utcfromtimestamp(float(order_status['timestamp'])) + date = pytz.utc.localize(date) + order = Order( + dt=date, + asset=self.assets[order_status['symbol']], + amount=amount, + stop=stop_price, + limit=limit_price, + filled=filled, + id=str(order_status['id']), + commission=commission + ) + order.status = status + + return order, executed_price + ''' + + def get_balances(self): + pass + ''' + log.debug('retrieving wallets balances') + try: + response = self._request('balances', None) + balances = response.json() + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'message' in balances: + raise ExchangeRequestError( + error='unable to fetch balance {}'.format(balances['message']) + ) + + std_balances = dict() + for balance in balances: + currency = balance['currency'].lower() + std_balances[currency] = float(balance['available']) + + return std_balances + ''' + + @property + def account(self): + account = Account() + + account.settled_cash = None + account.accrued_interest = None + account.buying_power = None + account.equity_with_loan = None + account.total_positions_value = None + account.total_positions_exposure = None + account.regt_equity = None + account.regt_margin = None + account.initial_margin_requirement = None + account.maintenance_margin_requirement = None + account.available_funds = None + account.excess_liquidity = None + account.cushion = None + account.day_trades_remaining = None + account.leverage = None + account.net_leverage = None + account.net_liquidation = None + + return account + + @property + def time_skew(self): + # TODO: research the time skew conditions + return pd.Timedelta('0s') + + def get_account(self): + # TODO: fetch account data and keep in cache + return None + + def get_candles(self, data_frequency, assets, bar_count=None): + pass + ''' + """ + Retrieve OHLVC candles from Bitfinex + + :param data_frequency: + :param assets: + :param bar_count: + :return: + + Available Frequencies + --------------------- + '1m', '5m', '15m', '30m', '1h', '3h', '6h', '12h', '1D', '7D', '14D', + '1M' + """ + + # TODO: use BcolzMinuteBarReader to read from cache + freq_match = re.match(r'([0-9].*)(m|h|d)', data_frequency, re.M | re.I) + if freq_match: + number = int(freq_match.group(1)) + unit = freq_match.group(2) + + if unit == 'd': + converted_unit = 'D' + else: + converted_unit = unit + + frequency = '{}{}'.format(number, converted_unit) + allowed_frequencies = ['1m', '5m', '15m', '30m', '1h', '3h', '6h', + '12h', '1D', '7D', '14D', '1M'] + + if frequency not in allowed_frequencies: + raise InvalidHistoryFrequencyError( + frequency=data_frequency + ) + elif data_frequency == 'minute': + frequency = '1m' + elif data_frequency == 'daily': + frequency = '1D' + else: + raise InvalidHistoryFrequencyError( + frequency=data_frequency + ) + + # Making sure that assets are iterable + asset_list = [assets] if isinstance(assets, TradingPair) else assets + ohlc_map = dict() + for asset in asset_list: + symbol = self._get_v2_symbol(asset) + url = '{url}/v2/candles/trade:{frequency}:{symbol}'.format( + url=self.url, + frequency=frequency, + symbol=symbol + ) + + if bar_count: + is_list = True + url += '/hist?limit={}'.format(int(bar_count)) + else: + is_list = False + url += '/last' + + try: + response = requests.get(url) + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'error' in response.content: + raise ExchangeRequestError( + error='Unable to retrieve candles: {}'.format( + response.content) + ) + + candles = response.json() + + def ohlc_from_candle(candle): + ohlc = dict( + open=np.float64(candle[1]), + high=np.float64(candle[3]), + low=np.float64(candle[4]), + close=np.float64(candle[2]), + volume=np.float64(candle[5]), + price=np.float64(candle[2]), + last_traded=pd.Timestamp.utcfromtimestamp( + candle[0] / 1000.0) + ) + return ohlc + + if is_list: + ohlc_bars = [] + # We can to list candles from old to new + for candle in reversed(candles): + ohlc = ohlc_from_candle(candle) + ohlc_bars.append(ohlc) + + ohlc_map[asset] = ohlc_bars + + else: + ohlc = ohlc_from_candle(candles) + ohlc_map[asset] = ohlc + + return ohlc_map[assets] \ + if isinstance(assets, TradingPair) else ohlc_map + ''' + + def create_order(self, asset, amount, is_buy, style): + pass + ''' + """ + Creating order on the exchange. + + :param asset: + :param amount: + :param is_buy: + :param style: + :return: + """ + exchange_symbol = self.get_symbol(asset) + if isinstance(style, ExchangeLimitOrder) \ + or isinstance(style, ExchangeStopLimitOrder): + price = style.get_limit_price(is_buy) + order_type = 'limit' + + elif isinstance(style, ExchangeStopOrder): + price = style.get_stop_price(is_buy) + order_type = 'stop' + + else: + raise InvalidOrderStyle(exchange=self.name, + style=style.__class__.__name__) + + req = dict( + symbol=exchange_symbol, + amount=str(float(abs(amount))), + price="{:.20f}".format(float(price)), + side='buy' if is_buy else 'sell', + type='exchange ' + order_type, # TODO: support margin trades + exchange=self.name, + is_hidden=False, + is_postonly=False, + use_all_available=0, + ocoorder=False, + buy_price_oco=0, + sell_price_oco=0 + ) + + date = pd.Timestamp.utcnow() + try: + response = self._request('order/new', req) + order_status = response.json() + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'message' in order_status: + raise ExchangeRequestError( + error='unable to create Bitfinex order {}'.format( + order_status['message']) + ) + + order_id = str(order_status['id']) + order = Order( + dt=date, + asset=asset, + amount=amount, + stop=style.get_stop_price(is_buy), + limit=style.get_limit_price(is_buy), + id=order_id + ) + + return order + ''' + + def get_open_orders(self, asset=None): + """Retrieve all of the current open orders. + + Parameters + ---------- + asset : Asset + If passed and not None, return only the open orders for the given + asset instead of all open orders. + + Returns + ------- + open_orders : dict[list[Order]] or list[Order] + If no asset is passed this will return a dict mapping Assets + to a list containing all the open orders for the asset. + If an asset is passed then this will return a list of the open + orders for this asset. + """ + pass + ''' + try: + response = self._request('orders', None) + order_statuses = response.json() + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'message' in order_statuses: + raise ExchangeRequestError( + error='Unable to retrieve open orders: {}'.format( + order_statuses['message']) + ) + + orders = list() + for order_status in order_statuses: + order, executed_price = self._create_order(order_status) + if asset is None or asset == order.sid: + orders.append(order) + + return orders + ''' + + def get_order(self, order_id): + """Lookup an order based on the order id returned from one of the + order functions. + + Parameters + ---------- + order_id : str + The unique identifier for the order. + + Returns + ------- + order : Order + The order object. + """ + pass + ''' + try: + response = self._request( + 'order/status', {'order_id': int(order_id)}) + order_status = response.json() + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'message' in order_status: + raise ExchangeRequestError( + error='Unable to retrieve order status: {}'.format( + order_status['message']) + ) + return self._create_order(order_status) + ''' + + def cancel_order(self, order_param): + """Cancel an open order. + + Parameters + ---------- + order_param : str or Order + The order_id or order object to cancel. + """ + pass + ''' + order_id = order_param.id \ + if isinstance(order_param, Order) else order_param + + try: + response = self._request('order/cancel', {'order_id': order_id}) + status = response.json() + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'message' in status: + raise OrderCancelError( + order_id=order_id, + exchange=self.name, + error=status['message'] + ) + ''' + + def tickers(self, assets): + """ + Fetch ticket data for assets + https://docs.bitfinex.com/v2/reference#rest-public-tickers + + :param assets: + :return: + """ + pass + + ''' + symbols = self._get_v2_symbols(assets) + log.debug('fetching tickers {}'.format(symbols)) + + try: + response = requests.get( + '{url}/v2/tickers?symbols={symbols}'.format( + url=self.url, + symbols=','.join(symbols), + ) + ) + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'error' in response.content: + raise ExchangeRequestError( + error='Unable to retrieve tickers: {}'.format( + response.content) + ) + + tickers = response.json() + + ticks = dict() + for index, ticker in enumerate(tickers): + if not len(ticker) == 11: + raise ExchangeRequestError( + error='Invalid ticker in response: {}'.format(ticker) + ) + + ticks[assets[index]] = dict( + timestamp=pd.Timestamp.utcnow(), + bid=ticker[1], + ask=ticker[3], + last_price=ticker[7], + low=ticker[10], + high=ticker[9], + volume=ticker[8], + ) + + log.debug('got tickers {}'.format(ticks)) + return ticks + ''' + + def generate_symbols_json(self, filename=None): + symbol_map = {} + response = self.api.returnticker() + for exchange_symbol in response: + base, market = self.sanitize_curency_symbol(exchange_symbol).split('_') + symbol = '{market}_{base}'.format( market=market, base=base ) + symbol_map[exchange_symbol] = dict( + symbol = symbol, + start_date = '2010-01-01' + ) + + if(filename is None): + filename = get_exchange_symbols_filename(self.name) + + with open(filename,'w') as f: + json.dump(symbol_map, f, sort_keys=True, indent=2, separators=(',',':')) + diff --git a/catalyst/exchange/poloniex/poloniex_api.py b/catalyst/exchange/poloniex/poloniex_api.py new file mode 100644 index 00000000..5094dd9f --- /dev/null +++ b/catalyst/exchange/poloniex/poloniex_api.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python +import json +import time +import hmac +import hashlib + +from six.moves import urllib + +# Workaround for backwards compatibility +# https://stackoverflow.com/questions/3745771/urllib-request-in-python-2-7 +urlopen = urllib.request.urlopen + + +class Poloniex_api(object): + def __init__(self, key, secret): + self.key = key + self.secret = secret + self.public = ['returnTicker', 'return24Volume', 'returnOrderBook', + 'returnTradeHistory', 'returnChartData', + 'returnCurrencies', 'returnLoanOrders'] + self.trading = ['returnBalances','returnCompleteBalances','returnDepositAddresses', + 'generateNewAddress','returnDepositsWithdrawals','returnOpenOrders', + 'returnTradeHistory','returnOrderTrades', + 'buy', 'sell', 'cancelOrder', 'moveOrder', + 'withdraw', 'returnFeeInfo','returnAvailableAccountBalances', + 'returnTradableBalances', 'transferBalance', + 'returnMarginAccountSummary','marginBuy','marginSell', + 'getMarginPosition', 'closeMarginPosition','createLoanOffer', + 'cancelLoanOffer','returnOpenLoanOffers','returnActiveLoans', + 'returnLendingHistory','toggleAutoRenew'] + + def query(self, method, values={}): + + if method in self.public: + url = 'https://poloniex.com/public?command=' + method + urllib.parse.urlencode(values) + headers = {} + post_data = None + elif method in self.trading: + url = 'https://poloniex.com/tradingApi' + req['command'] = method + req['nonce'] = int(time.time()*1000) + post_data = urllib.urlencode(req) + signature = hmac.new(self.secret, post_data, hashlib.sha512).hexdigest() + headers = { 'Sign': signature, 'Key': self.key} + + req = urllib.request.Request(url, data=post_data, headers=headers) + return json.loads(urlopen(req).read()) + + def returnticker(self): + return self.query('returnTicker') + + def return24volume(self): + return self.query('return24Volume') + + def returnOrderBook(self, market='all'): + return self.query('returnOrderBook', {'currencyPair': market}) + + def returntradehistory(self, market, start=None, end=None): + if(start is not None and end is not None): + return self.query('returntradehistory', + {'currencyPair': market, 'start': start, 'end': end }) + else: + return self.query('returntradehistory', {'currencyPair': market }) + + def returnchartdata(self, market, period, start, end): + return self.query('returnChartData', {'currencyPair': market, 'period': period, + 'start': start, 'end': end}) + + def returncurrencies(self): + return self.query('returnCurrencies') + + def returnloadorders(self, market): + return self.query('returnLoanOrders', {'market': market}) + + ''' + def buylimit(self, market, quantity, rate): + return self.query('buylimit', {'market': market, 'quantity': quantity, + 'rate': rate}) + + def buymarket(self, market, quantity): + return self.query('buymarket', + {'market': market, 'quantity': quantity}) + + def selllimit(self, market, quantity, rate): + return self.query('selllimit', {'market': market, 'quantity': quantity, + 'rate': rate}) + + def sellmarket(self, market, quantity): + return self.query('sellmarket', + {'market': market, 'quantity': quantity}) + + def cancel(self, uuid): + return self.query('cancel', {'uuid': uuid}) + + def getopenorders(self, market): + return self.query('getopenorders', {'market': market}) + + def getbalances(self): + return self.query('getbalances') + + def getbalance(self, currency): + return self.query('getbalance', {'currency': currency}) + + def getdepositaddress(self, currency): + return self.query('getdepositaddress', {'currency': currency}) + + def withdraw(self, currency, quantity, address): + return self.query('withdraw', + {'currency': currency, 'quantity': quantity, + 'address': address}) + + def getorder(self, uuid): + return self.query('getorder', {'uuid': uuid}) + + def getorderhistory(self, market, count): + return self.query('getorderhistory', + {'market': market, 'count': count}) + + def getwithdrawalhistory(self, currency, count): + return self.query('getwithdrawalhistory', + {'currency': currency, 'count': count}) + + def getdeposithistory(self, currency, count): + return self.query('getdeposithistory', + {'currency': currency, 'count': count}) + ''' diff --git a/catalyst/utils/run_algo.py b/catalyst/utils/run_algo.py index fc9f6354..63063c55 100644 --- a/catalyst/utils/run_algo.py +++ b/catalyst/utils/run_algo.py @@ -11,6 +11,8 @@ import pandas as pd import click from catalyst.exchange.bittrex.bittrex import Bittrex +from catalyst.exchange.bitfinex.bitfinex import Bitfinex +from catalyst.exchange.poloniex.poloniex import Poloniex try: from pygments import highlight @@ -39,7 +41,6 @@ import catalyst.utils.paths as pth from catalyst.exchange.algorithm_exchange import ExchangeTradingAlgorithm from catalyst.exchange.data_portal_exchange import DataPortalExchange -from catalyst.exchange.bitfinex.bitfinex import Bitfinex from catalyst.exchange.asset_finder_exchange import AssetFinderExchange from catalyst.exchange.exchange_portfolio import ExchangePortfolio from catalyst.exchange.exchange_errors import ( @@ -178,6 +179,13 @@ def _run(handle_data, base_currency=base_currency, portfolio=portfolio ) + elif exchange_name == 'poloniex': + exchange = Poloniex( + key=exchange_auth['key'], + secret=exchange_auth['secret'], + base_currency=base_currency, + portfolio=portfolio + ) else: raise NotImplementedError( 'exchange not supported: %s' % exchange_name) From 5d1bdee4a62aabfc954c126e3f108cea743d4428 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Mon, 25 Sep 2017 14:35:58 -0600 Subject: [PATCH 24/29] WIP: Poloniex exchange - generating symbols.json --- catalyst/curate/poloniex.py | 39 ++++++++++++++++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/catalyst/curate/poloniex.py b/catalyst/curate/poloniex.py index 911ac25f..e2a88476 100644 --- a/catalyst/curate/poloniex.py +++ b/catalyst/curate/poloniex.py @@ -2,10 +2,13 @@ import json, time, csv from datetime import datetime import pandas as pd import os, time, shutil, requests, logbook +from catalyst.exchange.exchange_utils import get_exchange_symbols_filename + DT_START = int(time.mktime(datetime(2010, 1, 1, 0, 0).timetuple())) DT_END = int(time.time()) CSV_OUT_FOLDER = '/var/tmp/catalyst/data/poloniex/' +CSV_OUT_FOLDER = '/Volumes/enigma/data/poloniex/' CONN_RETRIES = 2 logbook.StderrHandler().push_application() @@ -247,11 +250,45 @@ class PoloniexCurator(object): df.set_index('date', inplace=True) return df[start : end] + ''' + Generates a symbols.json file with corresponding start_date for each currencyPair + ''' + def generate_symbols_json(self, filename=None): + symbol_map = {} + + if(filename is None): + filename = get_exchange_symbols_filename('poloniex') + + with open(filename, 'w') as symbols: + for currencyPair in self.currency_pairs: + start = None + csv_fn = CSV_OUT_FOLDER + 'crypto_trades-' + currencyPair + '.csv' + with open(csv_fn, 'r') as f: + f.seek(0, os.SEEK_END) + if(f.tell() > 2): # First check file is not zero size + f.seek(-2, os.SEEK_END) # Jump to the second last byte. + while f.read(1) != b"\n": # Until EOL is found... + f.seek(-2, os.SEEK_CUR) # ...jump back the read byte plus one more. + start = pd.to_datetime( f.readline().split(',')[1], infer_datetime_format=True) + + if(start is None): + start = time.gmtime() + base, market = currencyPair.lower().split('_') + symbol = '{market}_{base}'.format( market=market, base=base ) + symbol_map[currencyPair] = dict( + symbol = symbol, + start_date = start.strftime("%Y-%m-%d") + ) + json.dump(symbol_map, symbols, sort_keys=True, indent=2, separators=(',',':')) + if __name__ == '__main__': pc = PoloniexCurator() pc.get_currency_pairs() - + #pc.generate_symbols_json() + for currencyPair in pc.currency_pairs: pc.retrieve_trade_history(currencyPair) pc.write_ohlcv_file(currencyPair) + + \ No newline at end of file From cf20f78e5532048697df18c5c42d3d54f5564b4e Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Mon, 25 Sep 2017 22:01:04 -0600 Subject: [PATCH 25/29] WIP: Poloniex exchange - balances, candles & cancel --- catalyst/exchange/poloniex/poloniex.py | 197 +++++++++------------ catalyst/exchange/poloniex/poloniex_api.py | 29 +-- 2 files changed, 103 insertions(+), 123 deletions(-) diff --git a/catalyst/exchange/poloniex/poloniex.py b/catalyst/exchange/poloniex/poloniex.py index bf8069b8..2b5b3586 100644 --- a/catalyst/exchange/poloniex/poloniex.py +++ b/catalyst/exchange/poloniex/poloniex.py @@ -9,7 +9,8 @@ import numpy as np import pandas as pd import pytz import requests -import six +#import six +from six import iteritems from catalyst.assets._assets import TradingPair from logbook import Logger @@ -37,12 +38,13 @@ class Poloniex(Exchange): self.api = Poloniex_api(key=key, secret=secret.encode('UTF-8')) self.name = 'poloniex' self.assets = {} - #self.load_assets() + self.load_assets() self.base_currency = base_currency self._portfolio = portfolio self.minute_writer = None self.minute_reader = None + def sanitize_curency_symbol(self, exchange_symbol): """ Helper method used to build the universal pair. @@ -53,47 +55,51 @@ class Poloniex(Exchange): """ return exchange_symbol.lower() - ''' + def _create_order(self, order_status): """ - Create a Catalyst order object from a Bitfinex order dictionary + Create a Catalyst order object from the Exchange order dictionary :param order_status: :return: Order """ - if order_status['is_cancelled']: - status = ORDER_STATUS.CANCELLED - elif not order_status['is_live']: - log.info('found executed order {}'.format(order_status)) - status = ORDER_STATUS.FILLED - else: - status = ORDER_STATUS.OPEN + #if order_status['is_cancelled']: + # status = ORDER_STATUS.CANCELLED + #elif not order_status['is_live']: + # log.info('found executed order {}'.format(order_status)) + # status = ORDER_STATUS.FILLED + #else: + status = ORDER_STATUS.OPEN - amount = float(order_status['original_amount']) - filled = float(order_status['executed_amount']) + amount = float(order_status['amount']) + #filled = float(order_status['executed_amount']) + filled = None - if order_status['side'] == 'sell': + if order_status['type'] == 'sell': amount = -amount - filled = -filled + #filled = -filled - price = float(order_status['price']) + price = float(order_status['rate']) order_type = order_status['type'] stop_price = None limit_price = None # TODO: is this comprehensive enough? - if order_type.endswith('limit'): - limit_price = price - elif order_type.endswith('stop'): - stop_price = price + #if order_type.endswith('limit'): + # limit_price = price + #elif order_type.endswith('stop'): + # stop_price = price - executed_price = float(order_status['avg_execution_price']) + #executed_price = float(order_status['avg_execution_price']) + executed_price = price # TODO: bitfinex does not specify comission. I could calculate it but not sure if it's worth it. commission = None - date = pd.Timestamp.utcfromtimestamp(float(order_status['timestamp'])) - date = pytz.utc.localize(date) + #date = pd.Timestamp.utcfromtimestamp(float(order_status['timestamp'])) + #date = pytz.utc.localize(date) + date = None + order = Order( dt=date, asset=self.assets[order_status['symbol']], @@ -101,36 +107,34 @@ class Poloniex(Exchange): stop=stop_price, limit=limit_price, filled=filled, - id=str(order_status['id']), + id=str(order_status['orderNumber']), commission=commission ) order.status = status return order, executed_price - ''' + def get_balances(self): - pass - ''' log.debug('retrieving wallets balances') try: - response = self._request('balances', None) - balances = response.json() + balances = self.api.returnbalances() except Exception as e: + log.debug(e) raise ExchangeRequestError(error=e) - if 'message' in balances: + if 'error' in balances: raise ExchangeRequestError( - error='unable to fetch balance {}'.format(balances['message']) + error='unable to fetch balance {}'.format(balances['error']) ) std_balances = dict() - for balance in balances: - currency = balance['currency'].lower() - std_balances[currency] = float(balance['available']) + for (key, value) in iteritems(balances): + currency = key.lower() + std_balances[currency] = float(value) return std_balances - ''' + @property def account(self): @@ -166,10 +170,8 @@ class Poloniex(Exchange): return None def get_candles(self, data_frequency, assets, bar_count=None): - pass - ''' """ - Retrieve OHLVC candles from Bitfinex + Retrieve OHLVC candles from Poloniex :param data_frequency: :param assets: @@ -178,33 +180,22 @@ class Poloniex(Exchange): Available Frequencies --------------------- - '1m', '5m', '15m', '30m', '1h', '3h', '6h', '12h', '1D', '7D', '14D', - '1M' + '5m', '15m', '30m', '2h', '4h', '1D' """ # TODO: use BcolzMinuteBarReader to read from cache - freq_match = re.match(r'([0-9].*)(m|h|d)', data_frequency, re.M | re.I) - if freq_match: - number = int(freq_match.group(1)) - unit = freq_match.group(2) - - if unit == 'd': - converted_unit = 'D' - else: - converted_unit = unit - - frequency = '{}{}'.format(number, converted_unit) - allowed_frequencies = ['1m', '5m', '15m', '30m', '1h', '3h', '6h', - '12h', '1D', '7D', '14D', '1M'] - - if frequency not in allowed_frequencies: - raise InvalidHistoryFrequencyError( - frequency=data_frequency - ) - elif data_frequency == 'minute': - frequency = '1m' - elif data_frequency == 'daily': - frequency = '1D' + if(data_frequency == '5m' or data_frequency == 'minute'): #TODO: Polo does not have '1m' + frequency = 300 + elif(data_frequency == '15m'): + frequency = 900 + elif(data_frequency == '30m'): + frequency = 1800 + elif(data_frequency == '2h'): + frequency = 7200 + elif(data_frequency == '4h'): + frequency = 14400 + elif(data_frequency == '1D' or data_frequency == 'daily'): + frequency = 86400 else: raise InvalidHistoryFrequencyError( frequency=data_frequency @@ -213,63 +204,51 @@ class Poloniex(Exchange): # Making sure that assets are iterable asset_list = [assets] if isinstance(assets, TradingPair) else assets ohlc_map = dict() + for asset in asset_list: - symbol = self._get_v2_symbol(asset) - url = '{url}/v2/candles/trade:{frequency}:{symbol}'.format( - url=self.url, - frequency=frequency, - symbol=symbol - ) - if bar_count: - is_list = True - url += '/hist?limit={}'.format(int(bar_count)) + end = int(time.time()) + if(bar_count is None): + start = end - 2 * frequency else: - is_list = False - url += '/last' + start = end - bar_count * frequency - try: - response = requests.get(url) + try: + response = self.api.returnchartdata(self.get_symbol(asset),frequency, start, end) except Exception as e: raise ExchangeRequestError(error=e) - if 'error' in response.content: + if 'error' in response: raise ExchangeRequestError( error='Unable to retrieve candles: {}'.format( response.content) ) - candles = response.json() - def ohlc_from_candle(candle): ohlc = dict( - open=np.float64(candle[1]), - high=np.float64(candle[3]), - low=np.float64(candle[4]), - close=np.float64(candle[2]), - volume=np.float64(candle[5]), - price=np.float64(candle[2]), - last_traded=pd.Timestamp.utcfromtimestamp( - candle[0] / 1000.0) + open=np.float64(candle['open']), + high=np.float64(candle['high']), + low=np.float64(candle['low']), + close=np.float64(candle['close']), + volume=np.float64(candle['volume']), + price=np.float64(candle['close']), + last_traded=pd.Timestamp.utcfromtimestamp( candle['date'] ) ) + return ohlc - if is_list: + if bar_count is None: + ohlc_map[asset] = ohlc_from_candle(response[0]) + else: ohlc_bars = [] - # We can to list candles from old to new - for candle in reversed(candles): + for candle in response: ohlc = ohlc_from_candle(candle) ohlc_bars.append(ohlc) - ohlc_map[asset] = ohlc_bars - else: - ohlc = ohlc_from_candle(candles) - ohlc_map[asset] = ohlc - return ohlc_map[assets] \ if isinstance(assets, TradingPair) else ohlc_map - ''' + def create_order(self, asset, amount, is_buy, style): pass @@ -338,7 +317,7 @@ class Poloniex(Exchange): return order ''' - def get_open_orders(self, asset=None): + def get_open_orders(self, asset='all'): """Retrieve all of the current open orders. Parameters @@ -355,28 +334,29 @@ class Poloniex(Exchange): If an asset is passed then this will return a list of the open orders for this asset. """ - pass - ''' try: - response = self._request('orders', None) - order_statuses = response.json() + if(asset=='all'): + response = self.api.returnopenorders('all') + else: + response = self.api.returnopenorders(self.get_symbol(asset)) except Exception as e: raise ExchangeRequestError(error=e) - if 'message' in order_statuses: + if 'error' in response: raise ExchangeRequestError( error='Unable to retrieve open orders: {}'.format( order_statuses['message']) ) + #TODO: Need to handle openOrders for 'all' orders = list() - for order_status in order_statuses: + for order_status in response: order, executed_price = self._create_order(order_status) if asset is None or asset == order.sid: orders.append(order) return orders - ''' + def get_order(self, order_id): """Lookup an order based on the order id returned from one of the @@ -417,24 +397,21 @@ class Poloniex(Exchange): order_param : str or Order The order_id or order object to cancel. """ - pass - ''' order_id = order_param.id \ if isinstance(order_param, Order) else order_param try: - response = self._request('order/cancel', {'order_id': order_id}) - status = response.json() + response = self.api.cancelorder(order_id) except Exception as e: raise ExchangeRequestError(error=e) - if 'message' in status: + if 'error' in response: raise OrderCancelError( order_id=order_id, exchange=self.name, - error=status['message'] + error=response['error'] ) - ''' + def tickers(self, assets): """ diff --git a/catalyst/exchange/poloniex/poloniex_api.py b/catalyst/exchange/poloniex/poloniex_api.py index 5094dd9f..2efdd858 100644 --- a/catalyst/exchange/poloniex/poloniex_api.py +++ b/catalyst/exchange/poloniex/poloniex_api.py @@ -29,19 +29,22 @@ class Poloniex_api(object): 'cancelLoanOffer','returnOpenLoanOffers','returnActiveLoans', 'returnLendingHistory','toggleAutoRenew'] - def query(self, method, values={}): + def query(self, method, req={}): if method in self.public: - url = 'https://poloniex.com/public?command=' + method + urllib.parse.urlencode(values) + url = 'https://poloniex.com/public?command=' + method + '&' + urllib.parse.urlencode(req) headers = {} post_data = None elif method in self.trading: url = 'https://poloniex.com/tradingApi' req['command'] = method req['nonce'] = int(time.time()*1000) - post_data = urllib.urlencode(req) + post_data = urllib.parse.urlencode(req) + print(post_data) signature = hmac.new(self.secret, post_data, hashlib.sha512).hexdigest() headers = { 'Sign': signature, 'Key': self.key} + else: + raise ValueError('Method "' + method + '" not found in neither the Public API or Trading API endpoints') req = urllib.request.Request(url, data=post_data, headers=headers) return json.loads(urlopen(req).read()) @@ -70,7 +73,16 @@ class Poloniex_api(object): return self.query('returnCurrencies') def returnloadorders(self, market): - return self.query('returnLoanOrders', {'market': market}) + return self.query('returnLoanOrders', {'currency': market}) + + def returnbalances(self): + return self.query('returnBalances') + + def returnopenorders(self, market): + return self.query('returnOpenOrders', {'currencyPair': market}) + + def cancelorder(self, ordernumber): + return self.query('cancelOrder', {'orderNumber': ordernumber}) ''' def buylimit(self, market, quantity, rate): @@ -89,15 +101,6 @@ class Poloniex_api(object): return self.query('sellmarket', {'market': market, 'quantity': quantity}) - def cancel(self, uuid): - return self.query('cancel', {'uuid': uuid}) - - def getopenorders(self, market): - return self.query('getopenorders', {'market': market}) - - def getbalances(self): - return self.query('getbalances') - def getbalance(self, currency): return self.query('getbalance', {'currency': currency}) From 2c2c861a8f3629f994ba0dbbc1c7bb86256d4459 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Tue, 26 Sep 2017 11:40:25 -0600 Subject: [PATCH 26/29] WIP: Poloniex exchange - fix for multiple exchanges --- catalyst/exchange/poloniex/poloniex_api.py | 1 - catalyst/utils/run_algo.py | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/catalyst/exchange/poloniex/poloniex_api.py b/catalyst/exchange/poloniex/poloniex_api.py index 2efdd858..ee5f5198 100644 --- a/catalyst/exchange/poloniex/poloniex_api.py +++ b/catalyst/exchange/poloniex/poloniex_api.py @@ -40,7 +40,6 @@ class Poloniex_api(object): req['command'] = method req['nonce'] = int(time.time()*1000) post_data = urllib.parse.urlencode(req) - print(post_data) signature = hmac.new(self.secret, post_data, hashlib.sha512).hexdigest() headers = { 'Sign': signature, 'Key': self.key} else: diff --git a/catalyst/utils/run_algo.py b/catalyst/utils/run_algo.py index de8e64a5..9130d2af 100644 --- a/catalyst/utils/run_algo.py +++ b/catalyst/utils/run_algo.py @@ -170,7 +170,6 @@ def _run(handle_data, base_currency=base_currency, portfolio=portfolio ) - elif exchange_name == 'bittrex': exchanges[exchange_name] = Bittrex( key=exchange_auth['key'], @@ -179,7 +178,7 @@ def _run(handle_data, portfolio=portfolio ) elif exchange_name == 'poloniex': - exchange = Poloniex( + exchanges[exchange_name] = Poloniex( key=exchange_auth['key'], secret=exchange_auth['secret'], base_currency=base_currency, From 87ecf6114d237da4ad6ed831707c05d5fdf23948 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Tue, 26 Sep 2017 13:32:23 -0600 Subject: [PATCH 27/29] adding min_trade_size in TradingPair --- catalyst/assets/_assets.pyx | 17 ++++++++++++----- catalyst/exchange/exchange.py | 8 +++++++- catalyst/exchange/exchange_algorithm.py | 6 +++--- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/catalyst/assets/_assets.pyx b/catalyst/assets/_assets.pyx index 05144223..b98127f7 100644 --- a/catalyst/assets/_assets.pyx +++ b/catalyst/assets/_assets.pyx @@ -408,7 +408,8 @@ cdef class TradingPair(Asset): 'exchange_full', 'leverage', 'market_currency', - 'base_currency' + 'base_currency', + 'min_trade_size', }) def __init__(self, object symbol, @@ -420,7 +421,8 @@ cdef class TradingPair(Asset): object end_date=None, object first_traded=None, object auto_close_date=None, - object exchange_full=None): + object exchange_full=None, + object min_trade_size=None): """ Replicates the Asset constructor with some built-in conventions and a new 'leverage' attribute. @@ -476,6 +478,7 @@ cdef class TradingPair(Asset): :param first_traded: :param auto_close_date: :param exchange_full: + :param min_trade_size: """ symbol = symbol.lower() @@ -509,6 +512,7 @@ cdef class TradingPair(Asset): first_traded=first_traded, auto_close_date=auto_close_date, exchange_full=exchange_full, + min_trade_size=min_trade_size ) self.leverage = leverage @@ -518,14 +522,16 @@ cdef class TradingPair(Asset): 'Introduced On: {start_date}, ' \ 'Market Currency: {market_currency}, ' \ 'Base Currency: {base_currency}, ' \ - 'Exchange Leverage: {leverage}'.format( + 'Exchange Leverage: {leverage}, ' \ + 'Minimum Trade Size: {min_trade_size}'.format( symbol=self.symbol, sid=self.sid, exchange=self.exchange, start_date=self.start_date, market_currency=self.market_currency, base_currency=self.base_currency, - leverage=self.leverage + leverage=self.leverage, + min_trade_size=self.min_trade_size ) cpdef __reduce__(self): @@ -544,7 +550,8 @@ cdef class TradingPair(Asset): self.end_date, self.first_traded, self.auto_close_date, - self.exchange_full)) + self.exchange_full, + self.min_trade_size)) def make_asset_array(int size, Asset asset): cdef np.ndarray out = np.empty([size], dtype=object) diff --git a/catalyst/exchange/exchange.py b/catalyst/exchange/exchange.py index ad4e89b9..ade14dfe 100644 --- a/catalyst/exchange/exchange.py +++ b/catalyst/exchange/exchange.py @@ -218,13 +218,19 @@ class Exchange: else: asset_name = None + if 'min_trade_size' in asset: + min_trade_size = asset['min_trade_size'] + else: + min_trade_size = 0.0000001 + trading_pair = TradingPair( symbol=asset['symbol'], exchange=self.name, start_date=start_date, end_date=end_date, leverage=leverage, - asset_name=asset_name + asset_name=asset_name, + min_trade_size=min_trade_size ) self.assets[exchange_symbol] = trading_pair diff --git a/catalyst/exchange/exchange_algorithm.py b/catalyst/exchange/exchange_algorithm.py index c9db3a12..3304fcc1 100644 --- a/catalyst/exchange/exchange_algorithm.py +++ b/catalyst/exchange/exchange_algorithm.py @@ -52,6 +52,7 @@ from catalyst.utils.api_support import ( from catalyst.utils.input_validation import error_keywords, ensure_upper_case, \ expect_types from catalyst.utils.preprocess import preprocess +from catalyst.utils.math_utils import round_nearest log = logbook.Logger('exchange_algorithm') @@ -67,15 +68,14 @@ class ExchangeTradingAlgorithmBase(TradingAlgorithm): super(ExchangeTradingAlgorithmBase, self).__init__(*args, **kwargs) - def round_order(self, amount): + def round_order(self, amount, asset): """ We need fractions with cryptocurrencies :param amount: :return: """ - # TODO: is this good enough? Victor has a better solution. - return amount + return round_nearest(amount, asset.min_trade_size) @api_method @preprocess(symbol_str=ensure_upper_case) From 1d0faf693d0f57d6ffe22beb186e956adde97b61 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Tue, 26 Sep 2017 15:36:14 -0600 Subject: [PATCH 28/29] WIP: Poloniex exchange - create order --- catalyst/exchange/poloniex/poloniex.py | 121 ++++++++------------- catalyst/exchange/poloniex/poloniex_api.py | 95 +++++++++------- 2 files changed, 100 insertions(+), 116 deletions(-) diff --git a/catalyst/exchange/poloniex/poloniex.py b/catalyst/exchange/poloniex/poloniex.py index 2b5b3586..92204e27 100644 --- a/catalyst/exchange/poloniex/poloniex.py +++ b/catalyst/exchange/poloniex/poloniex.py @@ -251,8 +251,6 @@ class Poloniex(Exchange): def create_order(self, asset, amount, is_buy, style): - pass - ''' """ Creating order on the exchange. @@ -263,59 +261,43 @@ class Poloniex(Exchange): :return: """ exchange_symbol = self.get_symbol(asset) - if isinstance(style, ExchangeLimitOrder) \ - or isinstance(style, ExchangeStopLimitOrder): + + if isinstance(style, ExchangeLimitOrder) or isinstance(style, ExchangeStopLimitOrder): + if isinstance(style, ExchangeStopLimitOrder): + log.warn('{} will ignore the stop price'.format(self.name)) + price = style.get_limit_price(is_buy) - order_type = 'limit' - elif isinstance(style, ExchangeStopOrder): - price = style.get_stop_price(is_buy) - order_type = 'stop' + try: + if(is_buy): + response = self.api.buy(exchange_symbol, amount, price) + else: + reponse = self.api.sell(exchange_symbol, amount, price) + except Exception as e: + raise ExchangeRequestError(error=e) + date = pd.Timestamp.utcnow() + + print(response) + + if('orderNumber' in response): + order_id = str(response['orderNumber']) + order = Order( + dt=date, + asset=asset, + amount=amount, + stop=style.get_stop_price(is_buy), + limit=style.get_limit_price(is_buy), + id=order_id + ) + return order + else: + log.warn('{} order failed: {}'.format('buy' if is_buy else 'sell', response['error'])) + return None else: raise InvalidOrderStyle(exchange=self.name, style=style.__class__.__name__) - req = dict( - symbol=exchange_symbol, - amount=str(float(abs(amount))), - price="{:.20f}".format(float(price)), - side='buy' if is_buy else 'sell', - type='exchange ' + order_type, # TODO: support margin trades - exchange=self.name, - is_hidden=False, - is_postonly=False, - use_all_available=0, - ocoorder=False, - buy_price_oco=0, - sell_price_oco=0 - ) - - date = pd.Timestamp.utcnow() - try: - response = self._request('order/new', req) - order_status = response.json() - except Exception as e: - raise ExchangeRequestError(error=e) - - if 'message' in order_status: - raise ExchangeRequestError( - error='unable to create Bitfinex order {}'.format( - order_status['message']) - ) - - order_id = str(order_status['id']) - order = Order( - dt=date, - asset=asset, - amount=amount, - stop=style.get_stop_price(is_buy), - limit=style.get_limit_price(is_buy), - id=order_id - ) - - return order - ''' def get_open_orders(self, asset='all'): """Retrieve all of the current open orders. @@ -420,51 +402,40 @@ class Poloniex(Exchange): :param assets: :return: - """ - pass - - ''' - symbols = self._get_v2_symbols(assets) + """ + symbols = [] + for asset in assets: + symbols.append(self.get_symbol(asset)) log.debug('fetching tickers {}'.format(symbols)) try: - response = requests.get( - '{url}/v2/tickers?symbols={symbols}'.format( - url=self.url, - symbols=','.join(symbols), - ) - ) + response = self.api.returnticker() except Exception as e: raise ExchangeRequestError(error=e) - if 'error' in response.content: + if 'error' in response: raise ExchangeRequestError( error='Unable to retrieve tickers: {}'.format( - response.content) + response['error']) ) - tickers = response.json() - ticks = dict() - for index, ticker in enumerate(tickers): - if not len(ticker) == 11: - raise ExchangeRequestError( - error='Invalid ticker in response: {}'.format(ticker) - ) + + for index, symbol in enumerate(symbols): ticks[assets[index]] = dict( timestamp=pd.Timestamp.utcnow(), - bid=ticker[1], - ask=ticker[3], - last_price=ticker[7], - low=ticker[10], - high=ticker[9], - volume=ticker[8], + bid=float(response[symbol]['highestBid']), + ask=float(response[symbol]['lowestAsk']), + last_price=float(response[symbol]['last']), + low=float(response[symbol]['lowestAsk']), #TODO: Polo does not provide low + high=float(response[symbol]['highestBid']), #TODO: Polo does not provide high + volume=float(response[symbol]['baseVolume']), ) log.debug('got tickers {}'.format(ticks)) return ticks - ''' + def generate_symbols_json(self, filename=None): symbol_map = {} diff --git a/catalyst/exchange/poloniex/poloniex_api.py b/catalyst/exchange/poloniex/poloniex_api.py index ee5f5198..3a181c07 100644 --- a/catalyst/exchange/poloniex/poloniex_api.py +++ b/catalyst/exchange/poloniex/poloniex_api.py @@ -49,10 +49,10 @@ class Poloniex_api(object): return json.loads(urlopen(req).read()) def returnticker(self): - return self.query('returnTicker') + return self.query('returnTicker', {}) def return24volume(self): - return self.query('return24Volume') + return self.query('return24Volume', {}) def returnOrderBook(self, market='all'): return self.query('returnOrderBook', {'currencyPair': market}) @@ -69,7 +69,7 @@ class Poloniex_api(object): 'start': start, 'end': end}) def returncurrencies(self): - return self.query('returnCurrencies') + return self.query('returnCurrencies', {}) def returnloadorders(self, market): return self.query('returnLoanOrders', {'currency': market}) @@ -77,52 +77,65 @@ class Poloniex_api(object): def returnbalances(self): return self.query('returnBalances') + def returncompletebalances(self, account): + if(account): + return self.query('returnCompleteBalances', {'account': account}) + else: + return self.query('returnCompleteBalances') + + def returndepositaddresses(self): + return self.query('returnDepositAddresses') + + def generatenewaddress(self, currency): + return self.query('generateNewAddress', {'currency': currency}) + + def returnDepositsWithdrawals(self, start, end): + return self.query('returnDepositsWithdrawals', {'start': start, 'end': end}) + def returnopenorders(self, market): return self.query('returnOpenOrders', {'currencyPair': market}) + def returntradehistory(self, market): + #TODO: optional start and/or end and limit + return self.query('returnTradeHistory', {'currencyPair': market}) + + def returnordertrades(self, ordernumber): + return self.query('returnOrderTrades', {'orderNumber': ordernumber}) + + def buy(self, market, amount, rate, fillorkill=0, immediateorcancel=0, postonly=0): + if(fillorkill): + return self.query('buy', {'currencyPair': market, 'rate':rate, 'amount': amount, + 'fillOrKill': fillorkill, }) + elif(immediateorcancel): + return self.query('buy', {'currencyPair': market, 'rate':rate, 'amount': amount, + 'immediateOrCancel': immediateorcancel, }) + elif(postonly): + return self.query('buy', {'currencyPair': market, 'rate':rate, 'amount': amount, + 'postOnly': postonly, }) + else: + return self.query('buy', {'currencyPair': market, 'rate':rate, 'amount': amount, }) + + def sell(self, market, amount, rate, fillorkill=0, immediateorcancel=0, postonly=0): + if(fillorkill): + return self.query('sell', {'currencyPair': market, 'rate':rate, 'amount': amount, + 'fillOrKill': fillorkill, }) + elif(immediateorcancel): + return self.query('sell', {'currencyPair': market, 'rate':rate, 'amount': amount, + 'immediateOrCancel': immediateorcancel, }) + elif(postonly): + return self.query('sell', {'currencyPair': market, 'rate':rate, 'amount': amount, + 'postOnly': postonly, }) + else: + return self.query('sell', {'currencyPair': market, 'rate':rate, 'amount': amount, }) + def cancelorder(self, ordernumber): return self.query('cancelOrder', {'orderNumber': ordernumber}) - ''' - def buylimit(self, market, quantity, rate): - return self.query('buylimit', {'market': market, 'quantity': quantity, - 'rate': rate}) - - def buymarket(self, market, quantity): - return self.query('buymarket', - {'market': market, 'quantity': quantity}) - - def selllimit(self, market, quantity, rate): - return self.query('selllimit', {'market': market, 'quantity': quantity, - 'rate': rate}) - - def sellmarket(self, market, quantity): - return self.query('sellmarket', - {'market': market, 'quantity': quantity}) - - def getbalance(self, currency): - return self.query('getbalance', {'currency': currency}) - - def getdepositaddress(self, currency): - return self.query('getdepositaddress', {'currency': currency}) - def withdraw(self, currency, quantity, address): return self.query('withdraw', - {'currency': currency, 'quantity': quantity, + {'currency': currency, 'amount': quantity, 'address': address}) - def getorder(self, uuid): - return self.query('getorder', {'uuid': uuid}) + def returnfeeinfo(self): + return self.query('returnFeeInfo') - def getorderhistory(self, market, count): - return self.query('getorderhistory', - {'market': market, 'count': count}) - - def getwithdrawalhistory(self, currency, count): - return self.query('getwithdrawalhistory', - {'currency': currency, 'count': count}) - - def getdeposithistory(self, currency, count): - return self.query('getdeposithistory', - {'currency': currency, 'count': count}) - ''' From 3362dbf95c0c0cfb6492afe5ac040ff1d9757e95 Mon Sep 17 00:00:00 2001 From: Victor Grau Serrat Date: Wed, 27 Sep 2017 14:31:15 -0600 Subject: [PATCH 29/29] WIP: Poloniex exchange - placing orders, executing transactions --- catalyst/exchange/exchange_errors.py | 6 ++ catalyst/exchange/exchange_portfolio.py | 24 +++++ catalyst/exchange/poloniex/poloniex.py | 138 ++++++++++++++++++++---- 3 files changed, 146 insertions(+), 22 deletions(-) diff --git a/catalyst/exchange/exchange_errors.py b/catalyst/exchange/exchange_errors.py index a6bfcbab..7e751981 100644 --- a/catalyst/exchange/exchange_errors.py +++ b/catalyst/exchange/exchange_errors.py @@ -101,6 +101,12 @@ class OrphanOrderError(ZiplineError): ).strip() +class OrphanOrderReverseError(ZiplineError): + msg = ( + 'Order {order_id} tracked by algorithm, but not found in exchange {exchange}.' + ).strip() + + class OrderCancelError(ZiplineError): msg = ( 'Unable to cancel order {order_id} on exchange {exchange} {error}.' diff --git a/catalyst/exchange/exchange_portfolio.py b/catalyst/exchange/exchange_portfolio.py index ded8a2a4..d7105480 100644 --- a/catalyst/exchange/exchange_portfolio.py +++ b/catalyst/exchange/exchange_portfolio.py @@ -70,6 +70,30 @@ class ExchangePortfolio(Portfolio): log.debug('updated portfolio with executed order') + def execute_transaction(self, transaction): + log.debug('executing transaction {}'.format(transaction.order_id)) + + order_position = self.positions[transaction.asset] \ + if transaction.asset in self.positions else None + + if order_position is None: + raise ValueError( + 'Trying to execute transaction for a position not held: %s' % transaction.order_id + ) + + self.capital_used += transaction.amount * transaction.price + + if transaction.amount > 0: + if order_position.cost_basis > 0: + order_position.cost_basis = np.average( + [order_position.cost_basis, transaction.price], + weights=[order_position.amount, transaction.amount] + ) + else: + order_position.cost_basis = transaction.price + + log.debug('updated portfolio with executed order') + def remove_order(self, order): log.info('removing cancelled order {}'.format(order.id)) del self.open_orders[order.id] diff --git a/catalyst/exchange/poloniex/poloniex.py b/catalyst/exchange/poloniex/poloniex.py index 92204e27..2a0b8474 100644 --- a/catalyst/exchange/poloniex/poloniex.py +++ b/catalyst/exchange/poloniex/poloniex.py @@ -4,6 +4,7 @@ import hmac import json import re import time +from collections import defaultdict import numpy as np import pandas as pd @@ -16,13 +17,13 @@ from logbook import Logger from catalyst.exchange.poloniex.poloniex_api import Poloniex_api - # from websocket import create_connection from catalyst.exchange.exchange import Exchange from catalyst.exchange.exchange_errors import ( ExchangeRequestError, InvalidHistoryFrequencyError, - InvalidOrderStyle, OrderCancelError) + InvalidOrderStyle, OrderCancelError, + OrphanOrderReverseError) from catalyst.exchange.exchange_execution import ExchangeLimitOrder, \ ExchangeStopLimitOrder, ExchangeStopOrder from catalyst.finance.order import Order, ORDER_STATUS @@ -43,6 +44,7 @@ class Poloniex(Exchange): self._portfolio = portfolio self.minute_writer = None self.minute_reader = None + self.transactions = defaultdict(list) def sanitize_curency_symbol(self, exchange_symbol): @@ -102,7 +104,7 @@ class Poloniex(Exchange): order = Order( dt=date, - asset=self.assets[order_status['symbol']], + asset=self.assets[order_status['symbol']], # No such field in Poloniex amount=amount, stop=stop_price, limit=limit_price, @@ -305,17 +307,25 @@ class Poloniex(Exchange): Parameters ---------- asset : Asset - If passed and not None, return only the open orders for the given + If passed and not 'all', return only the open orders for the given asset instead of all open orders. Returns ------- open_orders : dict[list[Order]] or list[Order] - If no asset is passed this will return a dict mapping Assets + If 'all' is passed this will return a dict mapping Assets to a list containing all the open orders for the asset. If an asset is passed then this will return a list of the open orders for this asset. """ + + return self.portfolio.open_orders + + """ + TODO: Why going to the exchange if we already have this info locally? + And why creating all these Orders if we later discard them? + """ + try: if(asset=='all'): response = self.api.returnopenorders('all') @@ -330,10 +340,12 @@ class Poloniex(Exchange): order_statuses['message']) ) + print(self.portfolio.open_orders) + #TODO: Need to handle openOrders for 'all' orders = list() for order_status in response: - order, executed_price = self._create_order(order_status) + order, executed_price = self._create_order(order_status) # will Throw error b/c Polo doesn't track order['symbol'] if asset is None or asset == order.sid: orders.append(order) @@ -354,22 +366,23 @@ class Poloniex(Exchange): order : Order The order object. """ - pass - ''' + try: - response = self._request( - 'order/status', {'order_id': int(order_id)}) - order_status = response.json() + order = self._portfolio.open_orders[order_id] + except Exception as e: + raise OrphanOrderError(order_id=order_id, exchange=self.name) + + try: + response = self.api.returnopenorders(self.get_symbol(order.sid)) except Exception as e: raise ExchangeRequestError(error=e) - if 'message' in order_status: - raise ExchangeRequestError( - error='Unable to retrieve order status: {}'.format( - order_status['message']) - ) - return self._create_order(order_status) - ''' + for order in response: + if(int(order['orderNumber'])==int(order_id)): + return True + + return None + def cancel_order(self, order_param): """Cancel an open order. @@ -394,6 +407,9 @@ class Poloniex(Exchange): error=response['error'] ) + self.portfolio.remove_order(order_param) #TODO: Verify this works + + def tickers(self, assets): """ @@ -403,9 +419,8 @@ class Poloniex(Exchange): :param assets: :return: """ - symbols = [] - for asset in assets: - symbols.append(self.get_symbol(asset)) + symbols = self.get_symbols(assets) + log.debug('fetching tickers {}'.format(symbols)) try: @@ -453,4 +468,83 @@ class Poloniex(Exchange): with open(filename,'w') as f: json.dump(symbol_map, f, sort_keys=True, indent=2, separators=(',',':')) - + + + def check_open_orders(self): + """ + Need to override this function for Poloniex: + + Loop through the list of open orders in the Portfolio object. + Check if any transactions have been executed: + If so, create a transaction and apply to the Portfolio. + Check if the order is still open: + If not, remove it from open orders + + :return: + transactions: Transaction[] + """ + transactions = list() + if self.portfolio.open_orders: + for order_id in list(self.portfolio.open_orders): + + order = self._portfolio.open_orders[order_id] + log.debug('found open order: {}'.format(order_id)) + + try: + order_open = self.get_order(order_id) + except Exception as e: + raise ExchangeRequestError(error=e) + + if(order_open): + delta = pd.Timestamp.utcnow() - order.dt + log.info( + 'order {order_id} still open after {delta}'.format( + order_id=order_id, + delta=delta ) + ) + + try: + response = self.api.returnordertrades(order_id) + except Exception as e: + raise ExchangeRequestError(error=e) + + if(response['error']): + if(not order_open): + raise OrphanOrderReverseError(order_id=order_id, exchange=self.name) + else: + for tx in response: + """ + We maintain a list of dictionaries of transactions that correspond to + partially filled orders, indexed by order_id. Every time we query + executed transactions from the exchange, we check if we had that + transaction for that order already. If not, we process it. + + When an order if fully filled, we flush the dict of transactions + associated with that order. + """ + if(not filter(lambda item: item['order_id'] == tx['tradeID'], self.transactions[order_id])): + log.debug('Got new transaction for order {}: amount {}, price {}'.format( + order_id, tx.amount, tx.rate)) + if(tx['type']=='sell'): + tx['amount'] = -tx['amount'] + transaction = Transaction( + asset=order.asset, + amount=tx['amount'], + dt=pd.to_datetime(tx['date'], utc=True), + price=tx['rate'], + order_id=tx['tradeID'], # it's a misnomer, but keeping it for compatibility + commission=tx['fee'] + ) + self.transactions[order_id].append(transaction) + self.portfolio.execute_transaction(transaction) + transactions.append(transaction) + + if(not order_open): + """ + Since transactions have been executed individually + the only thing left to do is remove them from list of open_orders + """ + del self.portfolio.open_orders[order_id] + del self.transactions[order_id] + + return transactions