diff --git a/catalyst/__main__.py b/catalyst/__main__.py index dd15e6f7..8b7f001b 100644 --- a/catalyst/__main__.py +++ b/catalyst/__main__.py @@ -9,7 +9,7 @@ from six import text_type from catalyst.data import bundles as bundles_module from catalyst.exchange.exchange_bundle import ExchangeBundle -from catalyst.exchange.init_utils import get_exchange +from catalyst.exchange.factory import get_exchange from catalyst.utils.cli import Date, Timestamp from catalyst.utils.run_algo import _run, load_extensions diff --git a/catalyst/exchange/bundle_utils.py b/catalyst/exchange/bundle_utils.py index b1aa1a07..acd2cc4b 100644 --- a/catalyst/exchange/bundle_utils.py +++ b/catalyst/exchange/bundle_utils.py @@ -103,8 +103,6 @@ def get_start_dt(end_dt, bar_count, data_frequency): return start_dt - - def get_month_start_end(dt): """ Returns the first and last day of the month for the specified date. @@ -215,4 +213,3 @@ def find_most_recent_time(bundle_name): return list(most_recent_bundle.keys())[0] else: return None - diff --git a/catalyst/exchange/exchange.py b/catalyst/exchange/exchange.py index 31037882..3640bbe3 100644 --- a/catalyst/exchange/exchange.py +++ b/catalyst/exchange/exchange.py @@ -20,7 +20,8 @@ from catalyst.exchange.exchange_errors import MismatchingBaseCurrencies, \ from catalyst.exchange.exchange_execution import ExchangeStopLimitOrder, \ ExchangeLimitOrder, ExchangeStopOrder from catalyst.exchange.exchange_portfolio import ExchangePortfolio -from catalyst.exchange.exchange_utils import get_exchange_symbols +from catalyst.exchange.exchange_utils import get_exchange_symbols, \ + get_frequency, resample_history_df from catalyst.finance.order import ORDER_STATUS from catalyst.finance.transaction import Transaction @@ -393,7 +394,7 @@ class Exchange: ) series = pd.Series(values, index=dates) - #TODO: ensure that this working as expected, if not use fillna + # TODO: ensure that this working as expected, if not use fillna series.reindex(periods, method='ffill', fill_value=previous_value) return series @@ -441,25 +442,9 @@ class Exchange: A dataframe containing the requested data. """ - freq_match = re.match(r'([0-9].*)(m|M|d|D)', frequency, re.M | re.I) - if freq_match: - candle_size = int(freq_match.group(1)) - unit = freq_match.group(2) - - else: - raise InvalidHistoryFrequencyError(frequency) - - if unit.lower() == 'd': - if data_frequency == 'minute': - data_frequency = 'daily' - - elif unit.lower() == 'm': - if data_frequency == 'daily': - data_frequency = 'minute' - - else: - raise InvalidHistoryFrequencyError(frequency) - + candle_size, unit, data_frequency = get_frequency( + frequency, data_frequency + ) adj_bar_count = candle_size * bar_count try: series = self.bundle.get_history_window_series_and_load( @@ -512,23 +497,7 @@ class Exchange: else: series[asset] = candle_series - df = pd.DataFrame(series) - - if candle_size > 1: - if field == 'open': - agg = 'first' - elif field == 'high': - agg = 'max' - elif field == 'low': - agg = 'min' - elif field == 'close': - agg = 'last' - elif field == 'volume': - agg = 'sum' - else: - raise ValueError('Invalid field.') - - df = df.resample('{}T'.format(candle_size)).agg(agg) + df = resample_history_df(pd.DataFrame(series), candle_size, field) return df diff --git a/catalyst/exchange/exchange_blotter.py b/catalyst/exchange/exchange_blotter.py index 2ca2cb88..8fcbf623 100644 --- a/catalyst/exchange/exchange_blotter.py +++ b/catalyst/exchange/exchange_blotter.py @@ -5,7 +5,7 @@ from catalyst.constants import LOG_LEVEL from catalyst.finance.blotter import Blotter from catalyst.finance.commission import CommissionModel from catalyst.finance.slippage import SlippageModel -from catalyst.finance.transaction import Transaction +from catalyst.finance.transaction import Transaction, create_transaction log = Logger('exchange_blotter', level=LOG_LEVEL) @@ -97,13 +97,16 @@ class TradingPairFixedSlippage(SlippageModel): execution_price, execution_volume = self.process_order(data, order) - transaction = Transaction( - asset=order.asset, - amount=abs(execution_volume), - dt=dt, - price=execution_price, - order_id=order.id + transaction = create_transaction( + order, dt, execution_price, execution_volume ) + # transaction = Transaction( + # asset=order.asset, + # amount=abs(execution_volume), + # dt=dt, + # price=execution_price, + # order_id=order.id + # ) self._volume_for_bar += abs(transaction.amount) yield order, transaction diff --git a/catalyst/exchange/data_portal_exchange.py b/catalyst/exchange/exchange_data_portal.py similarity index 97% rename from catalyst/exchange/data_portal_exchange.py rename to catalyst/exchange/exchange_data_portal.py index 3eebf3aa..3371c584 100644 --- a/catalyst/exchange/data_portal_exchange.py +++ b/catalyst/exchange/exchange_data_portal.py @@ -26,6 +26,7 @@ from catalyst.exchange.exchange_errors import ( ExchangeRequestError, ExchangeBarDataError, PricingDataNotLoadedError) +from catalyst.exchange.exchange_utils import get_frequency, resample_history_df log = Logger('DataPortalExchange', level=LOG_LEVEL) @@ -300,16 +301,23 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase): :param ffill: :return: """ - bundle = self.exchange_bundles[exchange.name] + + candle_size, unit, data_frequency = get_frequency( + frequency, data_frequency + ) + adj_bar_count = candle_size * bar_count + series = bundle.get_history_window_series_and_load( assets=assets, end_dt=end_dt, - bar_count=bar_count, + bar_count=adj_bar_count, field=field, data_frequency=data_frequency ) - return pd.DataFrame(series) + + df = resample_history_df(pd.DataFrame(series), candle_size, field) + return df def get_exchange_spot_value(self, exchange, assets, field, dt, data_frequency): diff --git a/catalyst/exchange/exchange_utils.py b/catalyst/exchange/exchange_utils.py index 77e0fc56..3bd35223 100644 --- a/catalyst/exchange/exchange_utils.py +++ b/catalyst/exchange/exchange_utils.py @@ -1,6 +1,7 @@ import json import os import pickle +import re from catalyst.assets._assets import TradingPair from six.moves.urllib import request @@ -8,7 +9,8 @@ from datetime import date, datetime import pandas as pd -from catalyst.exchange.exchange_errors import ExchangeSymbolsNotFound +from catalyst.exchange.exchange_errors import ExchangeSymbolsNotFound, \ + InvalidHistoryFrequencyError from catalyst.utils.paths import data_root, ensure_directory, \ last_modified_time @@ -17,6 +19,13 @@ SYMBOLS_URL = 'https://s3.amazonaws.com/enigmaco/catalyst-exchanges/' \ def get_exchange_folder(exchange_name, environ=None): + """ + The root path of an exchange folder. + + :param exchange_name: + :param environ: + :return: + """ if not environ: environ = os.environ @@ -28,11 +37,25 @@ def get_exchange_folder(exchange_name, environ=None): def get_exchange_symbols_filename(exchange_name, environ=None): + """ + The absolute path of the exchange's symbol.json file. + + :param exchange_name: + :param environ: + :return: + """ exchange_folder = get_exchange_folder(exchange_name, environ) return os.path.join(exchange_folder, 'symbols.json') def download_exchange_symbols(exchange_name, environ=None): + """ + Downloads the exchange's symbols.json from the repository. + + :param exchange_name: + :param environ: + :return: response + """ filename = get_exchange_symbols_filename(exchange_name) url = SYMBOLS_URL.format(exchange=exchange_name) response = request.urlretrieve(url=url, filename=filename) @@ -40,6 +63,13 @@ def download_exchange_symbols(exchange_name, environ=None): def get_exchange_symbols(exchange_name, environ=None): + """ + The de-serialized content of the exchange's symbols.json. + + :param exchange_name: + :param environ: + :return: + """ filename = get_exchange_symbols_filename(exchange_name) if not os.path.isfile(filename) or \ @@ -60,11 +90,24 @@ def get_exchange_symbols(exchange_name, environ=None): def get_symbols_string(assets): + """ + A concatenated string of symbols from a list of assets. + + :param assets: + :return: + """ array = [assets] if isinstance(assets, TradingPair) else assets return ', '.join([asset.symbol for asset in array]) def get_exchange_auth(exchange_name, environ=None): + """ + The de-serialized contend of the exchange's auth.json file. + + :param exchange_name: + :param environ: + :return: + """ exchange_folder = get_exchange_folder(exchange_name, environ) filename = os.path.join(exchange_folder, 'auth.json') @@ -81,6 +124,13 @@ def get_exchange_auth(exchange_name, environ=None): def get_algo_folder(algo_name, environ=None): + """ + The algorithm root folder of the algorithm. + + :param algo_name: + :param environ: + :return: + """ if not environ: environ = os.environ @@ -92,6 +142,15 @@ def get_algo_folder(algo_name, environ=None): def get_algo_object(algo_name, key, environ=None, rel_path=None): + """ + The de-serialized object of the algo name and key. + + :param algo_name: + :param key: + :param environ: + :param rel_path: + :return: + """ if algo_name is None: return None @@ -113,6 +172,16 @@ def get_algo_object(algo_name, key, environ=None, rel_path=None): def save_algo_object(algo_name, key, obj, environ=None, rel_path=None): + """ + Serialize and save an object by algo name and key. + + :param algo_name: + :param key: + :param obj: + :param environ: + :param rel_path: + :return: + """ folder = get_algo_folder(algo_name, environ) if rel_path is not None: @@ -125,16 +194,16 @@ def save_algo_object(algo_name, key, obj, environ=None, rel_path=None): pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL) -def append_algo_object(algo_name, key, obj, environ=None): - algo_folder = get_algo_folder(algo_name, environ) - filename = os.path.join(algo_folder, key + '.p') - - mode = 'a+b' if os.path.isfile(filename) else 'wb' - with open(filename, mode) as handle: - pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL) - - def get_algo_df(algo_name, key, environ=None, rel_path=None): + """ + The de-serialized DataFrame of an algo name and key. + + :param algo_name: + :param key: + :param environ: + :param rel_path: + :return: + """ folder = get_algo_folder(algo_name, environ) if rel_path is not None: @@ -153,6 +222,16 @@ def get_algo_df(algo_name, key, environ=None, rel_path=None): def save_algo_df(algo_name, key, df, environ=None, rel_path=None): + """ + Serialize to csv and save a DataFrame by algo name and key. + + :param algo_name: + :param key: + :param df: + :param environ: + :param rel_path: + :return: + """ folder = get_algo_folder(algo_name, environ) if rel_path is not None: @@ -166,6 +245,13 @@ def save_algo_df(algo_name, key, df, environ=None, rel_path=None): def get_exchange_minute_writer_root(exchange_name, environ=None): + """ + The minute writer folder for the exchange. + + :param exchange_name: + :param environ: + :return: + """ exchange_folder = get_exchange_folder(exchange_name, environ) minute_data_folder = os.path.join(exchange_folder, 'minute_data') @@ -175,6 +261,13 @@ def get_exchange_minute_writer_root(exchange_name, environ=None): def get_exchange_bundles_folder(exchange_name, environ=None): + """ + The temp folder for bundle downloads by algo name. + + :param exchange_name: + :param environ: + :return: + """ exchange_folder = get_exchange_folder(exchange_name, environ) temp_bundles = os.path.join(exchange_folder, 'temp_bundles') @@ -184,8 +277,86 @@ def get_exchange_bundles_folder(exchange_name, environ=None): def perf_serial(obj): - """JSON serializer for objects not serializable by default json code""" + """ + JSON serializer for objects not serializable by default json code + :param obj: + :return: + """ if isinstance(obj, (datetime, date)): return obj.isoformat() + raise TypeError("Type %s not serializable" % type(obj)) + + +def get_common_assets(exchanges): + """ + The assets available in all specified exchanges. + + :param exchanges: + :return: + """ + symbols = [] + for exchange_name in exchanges: + s = [asset.symbol for asset in exchanges[exchange_name].get_assets()] + symbols.append(s) + + inter_symbols = set.intersection(*map(set, symbols)) + + assets = [] + for symbol in inter_symbols: + for exchange_name in exchanges: + asset = exchanges[exchange_name].get_asset(symbol) + assets.append(asset) + + return assets + + +def get_frequency(freq, data_frequency): + if freq == 'daily': + freq = '1d' + elif freq == 'minute': + freq = '1m' + + freq_match = re.match(r'([0-9].*)(m|M|d|D)', freq, re.M | re.I) + if freq_match: + candle_size = int(freq_match.group(1)) + unit = freq_match.group(2) + + else: + raise InvalidHistoryFrequencyError(freq) + + if unit.lower() == 'd': + if data_frequency == 'minute': + data_frequency = 'daily' + + elif unit.lower() == 'm': + if data_frequency == 'daily': + data_frequency = 'minute' + + else: + raise InvalidHistoryFrequencyError(freq) + + return candle_size, unit, data_frequency + + +def resample_history_df(df, candle_size, field): + if candle_size > 1: + if field == 'open': + agg = 'first' + elif field == 'high': + agg = 'max' + elif field == 'low': + agg = 'min' + elif field == 'close': + agg = 'last' + elif field == 'volume': + agg = 'sum' + else: + raise ValueError('Invalid field.') + + # TODO: pad with nan? + return df.resample('{}T'.format(candle_size)).agg(agg) + + else: + return df diff --git a/catalyst/exchange/init_utils.py b/catalyst/exchange/factory.py similarity index 72% rename from catalyst/exchange/init_utils.py rename to catalyst/exchange/factory.py index a37f0441..72c66bd8 100644 --- a/catalyst/exchange/init_utils.py +++ b/catalyst/exchange/factory.py @@ -5,28 +5,39 @@ from catalyst.exchange.exchange_utils import get_exchange_auth from catalyst.exchange.poloniex.poloniex import Poloniex -def get_exchange(exchange_name): +def get_exchange(exchange_name, base_currency=None): exchange_auth = get_exchange_auth(exchange_name) if exchange_name == 'bitfinex': return Bitfinex( key=exchange_auth['key'], secret=exchange_auth['secret'], - base_currency=None, # TODO: make optional at the exchange + base_currency=base_currency, portfolio=None ) + elif exchange_name == 'bittrex': return Bittrex( key=exchange_auth['key'], secret=exchange_auth['secret'], - base_currency=None, + base_currency=base_currency, portfolio=None ) + elif exchange_name == 'poloniex': return Poloniex( key=exchange_auth['key'], secret=exchange_auth['secret'], - base_currency=None, + base_currency=base_currency, portfolio=None ) + else: raise ExchangeNotFoundError(exchange_name=exchange_name) + + +def get_exchanges(exchange_names): + exchanges = dict() + for exchange_name in exchange_names: + exchanges[exchange_name] = get_exchange(exchange_name) + + return exchanges diff --git a/catalyst/support/__init__.py b/catalyst/support/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/catalyst/support/issue_44.py b/catalyst/support/issue_44.py new file mode 100644 index 00000000..b6d1277a --- /dev/null +++ b/catalyst/support/issue_44.py @@ -0,0 +1,109 @@ +import pandas as pd +from catalyst import run_algorithm +from catalyst.exchange.exchange_utils import get_exchange_symbols + +from catalyst.api import ( + symbols, +) + + +def initialize(context): + context.i = -1 + context.base_currency = 'btc' + + +def handle_data(context, data): + lookback = 60 * 24 * 7 # (minutes, hours, days) + context.i += 1 + if context.i < lookback: + return + + today = context.blotter.current_dt.strftime('%Y-%m-%d %H:%M:%S') + + try: + # update universe everyday + new_day = 60 * 24 + if not context.i % new_day: + context.universe = universe(context, today) + + # get data every 30 minutes + minutes = 30 + if not context.i % minutes and context.universe: + for coin in context.coins: + pair = str(coin.symbol) + + # ohlcv data + open = data.history(coin, 'open', lookback, + '1m').ffill().bfill().resample( + '30T').first() + high = data.history(coin, 'high', lookback, + '1m').ffill().bfill().resample('30T').max() + low = data.history(coin, 'low', lookback, + '1m').ffill().bfill().resample('30T').min() + close = data.history(coin, 'price', lookback, + '1m').ffill().bfill().resample( + '30T').last() + volume = data.history(coin, 'volume', lookback, + '1m').ffill().bfill().resample( + '30T').sum() + + print(today, pair, close[-1]) + + except Exception as e: + print(e) + + +def analyze(context=None, results=None): + pass + + +def universe(context, today): + json_symbols = get_exchange_symbols('poloniex') + poloniex_universe_df = pd.DataFrame.from_dict( + json_symbols).transpose().astype(str) + poloniex_universe_df['base_currency'] = poloniex_universe_df.apply( + lambda row: row.symbol.split('_')[1], + axis=1) + poloniex_universe_df['market_currency'] = poloniex_universe_df.apply( + lambda row: row.symbol.split('_')[0], + axis=1) + poloniex_universe_df = poloniex_universe_df[ + poloniex_universe_df['base_currency'] == context.base_currency] + poloniex_universe_df = poloniex_universe_df[ + poloniex_universe_df.symbol != 'gas_btc'] + + # Markets currently not working on Catalyst 0.3.1 + # 2017-01-01 + # poloniex_universe_df = poloniex_universe_df[poloniex_universe_df.symbol != 'bcn_btc'] + # poloniex_universe_df = poloniex_universe_df[poloniex_universe_df.symbol != 'burst_btc'] + # poloniex_universe_df = poloniex_universe_df[poloniex_universe_df.symbol != 'dgb_btc'] + # poloniex_universe_df = poloniex_universe_df[poloniex_universe_df.symbol != 'doge_btc'] + # poloniex_universe_df = poloniex_universe_df[poloniex_universe_df.symbol != 'emc2_btc'] + # poloniex_universe_df = poloniex_universe_df[poloniex_universe_df.symbol != 'pink_btc'] + # poloniex_universe_df = poloniex_universe_df[poloniex_universe_df.symbol != 'sc_btc'] + print(poloniex_universe_df.head()) + + date = str(today).split(' ')[0] + + poloniex_universe_df = poloniex_universe_df[ + poloniex_universe_df.start_date < date] + context.coins = symbols(*poloniex_universe_df.symbol) + print(len(poloniex_universe_df)) + return poloniex_universe_df.symbol.tolist() + + +if __name__ == '__main__': + start_date = pd.to_datetime('2017-01-01', utc=True) + end_date = pd.to_datetime('2017-10-15', utc=True) + + performance = run_algorithm(start=start_date, end=end_date, + capital_base=10000.0, + initialize=initialize, + handle_data=handle_data, + analyze=analyze, + exchange_name='poloniex', + data_frequency='minute', + base_currency='btc', + live=False, + live_graph=False, + algo_namespace='test') diff --git a/catalyst/utils/run_algo.py b/catalyst/utils/run_algo.py index a5293ac5..c12dedc9 100644 --- a/catalyst/utils/run_algo.py +++ b/catalyst/utils/run_algo.py @@ -31,7 +31,7 @@ import catalyst.utils.paths as pth from catalyst.exchange.exchange_algorithm import ExchangeTradingAlgorithmLive, \ ExchangeTradingAlgorithmBacktest -from catalyst.exchange.data_portal_exchange import DataPortalExchangeLive, \ +from catalyst.exchange.exchange_data_portal import DataPortalExchangeLive, \ DataPortalExchangeBacktest from catalyst.exchange.asset_finder_exchange import AssetFinderExchange from catalyst.exchange.exchange_portfolio import ExchangePortfolio diff --git a/tests/exchange/__init.py b/tests/exchange/__init.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/exchange/base.py b/tests/exchange/base.py index b98c3ec1..da67e8f0 100644 --- a/tests/exchange/base.py +++ b/tests/exchange/base.py @@ -1,4 +1,3 @@ -import unittest from abc import ABCMeta, abstractmethod diff --git a/tests/exchange/test_bundle.py b/tests/exchange/test_bundle.py index 9c39e4e4..69b31400 100644 --- a/tests/exchange/test_bundle.py +++ b/tests/exchange/test_bundle.py @@ -137,10 +137,10 @@ class TestExchangeBundle: log.info('ingesting exchange bundle {}'.format(exchange_name)) exchange_bundle.ingest( data_frequency=data_frequency, - include_symbols=include_symbols, + include_symbols=None, exclude_symbols=None, - start=start, - end=end, + start=None, + end=None, show_progress=True ) diff --git a/tests/exchange/test_data_portal.py b/tests/exchange/test_data_portal.py index 8a605a0b..31c67295 100644 --- a/tests/exchange/test_data_portal.py +++ b/tests/exchange/test_data_portal.py @@ -1,47 +1,37 @@ import pandas as pd +from catalyst.exchange.exchange_data_portal import DataPortalExchangeBacktest, \ + DataPortalExchangeLive from logbook import Logger +from test_utils import rnd_history_date_days, rnd_bar_count from catalyst import get_calendar from catalyst.exchange.asset_finder_exchange import AssetFinderExchange from catalyst.exchange.bitfinex.bitfinex import Bitfinex from catalyst.exchange.bittrex.bittrex import Bittrex -from catalyst.exchange.data_portal_exchange import DataPortalExchangeBacktest, \ - DataPortalExchangeLive -from catalyst.exchange.exchange_utils import get_exchange_auth +from catalyst.exchange.exchange_utils import get_exchange_auth, \ + get_common_assets +from catalyst.exchange.factory import get_exchange, get_exchanges log = Logger('test_bitfinex') -class TestExchangeDataPortalTestCase: +class TestExchangeDataPortal: @classmethod def setup(self): log.info('creating bitfinex exchange') - auth_bitfinex = get_exchange_auth('bitfinex') - self.bitfinex = Bitfinex( - key=auth_bitfinex['key'], - secret=auth_bitfinex['secret'], - base_currency='usd' - ) - - log.info('creating bittrex exchange') - auth_bitfinex = get_exchange_auth('bittrex') - self.bittrex = Bittrex( - key=auth_bitfinex['key'], - secret=auth_bitfinex['secret'], - base_currency='usd' - ) - + exchanges = get_exchanges(['bitfinex', 'bittrex', 'poloniex']) open_calendar = get_calendar('OPEN') asset_finder = AssetFinderExchange() self.data_portal_live = DataPortalExchangeLive( - exchanges=dict(bitfinex=self.bitfinex, bittrex=self.bittrex), + exchanges=exchanges, asset_finder=asset_finder, trading_calendar=open_calendar, first_trading_day=pd.to_datetime('today', utc=True) ) + self.data_portal_backtest = DataPortalExchangeBacktest( - exchanges=dict(bitfinex=self.bitfinex), + exchanges=exchanges, asset_finder=asset_finder, trading_calendar=open_calendar, first_trading_day=None # will set dynamically based on assets @@ -106,3 +96,20 @@ class TestExchangeDataPortalTestCase: assets, 'close', date, 'minute') log.info('found spot value {}'.format(value)) pass + + def test_history_compare_exchanges(self): + exchanges = get_exchanges(['bittrex', 'bitfinex', 'poloniex']) + assets = get_common_assets(exchanges) + + date = rnd_history_date_days() + bar_count = rnd_bar_count() + data = self.data_portal_backtest.get_history_window( + assets=assets, + end_dt=date, + bar_count=bar_count, + frequency='1d', + field='close', + data_frequency='daily' + ) + + log.info('found history window: {}'.format(data)) diff --git a/tests/exchange/test_utils.py b/tests/exchange/test_utils.py new file mode 100644 index 00000000..eb53bff5 --- /dev/null +++ b/tests/exchange/test_utils.py @@ -0,0 +1,17 @@ +from datetime import timedelta +from random import randint + +import pandas as pd + + +def rnd_history_date_days(max_days=30): + now = pd.Timestamp.utcnow() + days = randint(0, max_days) + + return now - timedelta(days=days) + + +def rnd_bar_count(max_bars=21): + now = pd.Timestamp.utcnow() + + return randint(0, max_bars)