From 9c03012aa45be59411b44f596a9a2053e93f489d Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 22 Jun 2017 20:50:07 -0700 Subject: [PATCH] Untold improvements --- catalyst/data/bundles/poloniex.py | 24 ++- catalyst/data/loader.py | 180 ++++++++++++++++++ catalyst/examples/buy_and_hold.py | 91 ++++++++- catalyst/examples/dual_moving_average_btc.py | 12 +- .../utils/calendars/exchange_calendar_open.py | 6 +- catalyst/utils/run_algo.py | 12 +- curate/crypto_price_generator.py | 22 +-- 7 files changed, 315 insertions(+), 32 deletions(-) diff --git a/catalyst/data/bundles/poloniex.py b/catalyst/data/bundles/poloniex.py index b37999c8..b4e8a658 100644 --- a/catalyst/data/bundles/poloniex.py +++ b/catalyst/data/bundles/poloniex.py @@ -4,6 +4,7 @@ from datetime import datetime import numpy as np import pandas as pd from pandas_datareader.data import DataReader +from pandas.tseries.offsets import DateOffset import requests from catalyst.utils.calendars import register_calendar_alias @@ -88,19 +89,30 @@ def poloniex_cryptoassets(symbols, start=None, end=None): for symbol in symbols: #def to_dataframe(self, start, end, currencyPair=None): - csv_fn = '/var/tmp/' + 'crypto_prices-' + symbol + '.csv' # TODO: DIR as parameter + csv_fn = '/var/tmp/catalyst/data/poloniex/crypto_prices-' + symbol + '.csv' # TODO: DIR as parameter #last_date = self._get_start_date(csv_fn) #if last_date + 300 < end or not os.path.exists(csv_fn): # get latest data #self.append_data_single_pair(currencyPair) # CSV holds the latest snapshot - df = pd.read_csv(csv_fn, names=['date', 'open', 'high', 'low', 'close', 'volume']) - df['date']=pd.to_datetime(df['date'], utc=True, unit='s') - df.set_index('date', inplace=True) + data = pd.read_csv(csv_fn, names=['date', 'open', 'high', 'low', 'close', 'volume']) + data['date'] = pd.to_datetime(data['date'], utc=True, unit='s') + data.set_index('date', inplace=True) #df = df.resample('D').mean() - df = df.loc[df.index.isin(calendar.schedule.index)] + df = data.loc[data.index.isin(calendar.schedule.index)] + + offset = DateOffset(days=1) + for start_date in df.index: + end_date = start_date + offset + day_data = data[start_date:end_date] + + df[start_date]['open'] = day_data[0]['open'] + df[start_date]['high'] = day_data['high'].max() + df[start_date]['low'] = day_data['low'].min() + df[start_date]['close'] = day_data[-1]['close'] + df[start_date]['volume'] = day_data['volume'].sum() # the start date is the date of the first trade and # the end date is the date of the last trade @@ -158,7 +170,7 @@ def poloniex_cryptoassets(symbols, start=None, end=None): symbol_map = pd.Series(metadata.symbol.index, metadata.symbol) # Hardcode the exchange to "POLO" for all assets and (elsewhere) - # register "YAHOO" to resolve to the OPEN calendar, because these are + # register "POLO" to resolve to the OPEN calendar, because these are # all cryptoassets and thus use the OPEN calendar. metadata['exchange'] = 'POLO' asset_db_writer.write(equities=metadata) diff --git a/catalyst/data/loader.py b/catalyst/data/loader.py index a436f8cf..4c4fc94e 100644 --- a/catalyst/data/loader.py +++ b/catalyst/data/loader.py @@ -18,6 +18,7 @@ from collections import OrderedDict import logbook import pandas as pd from pandas_datareader.data import DataReader +import datetime import pytz from six import iteritems from six.moves.urllib_error import HTTPError @@ -91,6 +92,56 @@ def has_data_for_dates(series_or_df, first_date, last_date): first, last = dts[[0, -1]] return (first <= first_date) and (last >= last_date) +def load_crypto_market_data(trading_day=None, + trading_days=None, + bm_symbol='USDT_BTC', + environ=None): + if trading_day is None: + trading_day = get_calendar('OPEN').trading_day + if trading_days is None: + trading_days = get_calendar('OPEN').all_sessions + + first_date = trading_days[0] + now = pd.Timestamp.utcnow() + + # We expect to have benchmark and treasury data that's current up until + # **two** full trading days prior to the most recently completed trading + # day. + # Example: + # On Thu Oct 22 2015, the previous completed trading day is Wed Oct 21. + # However, data for Oct 21 doesn't become available until the early morning + # hours of Oct 22. This means that there are times on the 22nd at which we + # cannot reasonably expect to have data for the 21st available. To be + # conservative, we instead expect that at any time on the 22nd, we can + # download data for Tuesday the 20th, which is two full trading days prior + # to the date on which we're running a test. + + # We'll attempt to download new data if the latest entry in our cache is + # before this date. + last_date = trading_days[trading_days.get_loc(now, method='ffill') - 2] + + br = ensure_crypto_benchmark_data( + bm_symbol, + first_date, + last_date, + now, + # We need the trading_day to figure out the close prior to the first + # date so that we can compute returns for the first date. + trading_day, + environ, + ) + tc = ensure_treasury_data( + bm_symbol, + first_date, + last_date, + now, + environ, + ) + benchmark_returns = br[br.index.slice_indexer(first_date, last_date)] + treasury_curves = tc[tc.index.slice_indexer(first_date, last_date)] + return benchmark_returns, treasury_curves + + def load_market_data(trading_day=None, trading_days=None, bm_symbol='SPY', environ=None): @@ -177,6 +228,135 @@ def load_market_data(trading_day=None, trading_days=None, bm_symbol='SPY', treasury_curves = tc[tc.index.slice_indexer(first_date, last_date)] return benchmark_returns, treasury_curves +def ensure_crypto_benchmark_data(symbol, first_date, last_date, now, + trading_day, environ=None): + filename = get_benchmark_filename(symbol) + source_filename = '/var/tmp/catalyst/data/poloniex/crypto_prices-{0}.csv'.\ + format(symbol) + + logger.info( + ('Loading benchmark data for {symbol!r} ' + 'from {first_date} to {last_date}'), + symbol=symbol, + first_date=first_date - trading_day, + last_date=last_date + ) + + data = _load_cached_data( + filename, + first_date, + last_date, + now, + 'benchmark', + environ, + ) + + + if data is not None: + print 'benchmark data:\n', data.head() + return data + + # If no cached data was found or it was missing any dates then download the + # necessary data. + logger.info( + ('Downloading benchmark data for {symbol!r} ' + 'from {first_date} to {last_date}'), + symbol=symbol, + first_date=first_date - trading_day, + last_date=last_date + ) + + def dateparse(time_in_secs): + return datetime.datetime.fromtimestamp(float(time_in_secs), pytz.utc) + + try: + data = pd.read_csv( + source_filename, + names=['date', 'open', 'high', 'low', 'close', 'volume'], + index_col=[0], + parse_dates=True, + date_parser=dateparse, + ) + data = data[['close']] + + print 'loaded benchmark data:\n', data.index + + data = data[ + (data.index >= (first_date-trading_day)) & + (data.index <= last_date) + ] + data = data.pct_change(1).iloc[1:] + + print 'writing benchmark data:\n', data.head() + + data.to_csv(get_data_filepath(filename, environ)) + except (OSError, IOError, HTTPError): + logger.exception('Failed to cache the new benchmark returns') + raise + if not has_data_for_dates(data, first_date, last_date): + logger.warn("Still don't have expected data after redownload!") + return data + + +def ensure_benchmark_data(symbol, first_date, last_date, now, trading_day, + environ=None): + """ + Ensure we have benchmark data for `symbol` from `first_date` to `last_date` + + Parameters + ---------- + symbol : str + The symbol for the benchmark to load. + first_date : pd.Timestamp + First required date for the cache. + last_date : pd.Timestamp + Last required date for the cache. + now : pd.Timestamp + The current time. This is used to prevent repeated attempts to + re-download data that isn't available due to scheduling quirks or other + failures. + trading_day : pd.CustomBusinessDay + A trading day delta. Used to find the day before first_date so we can + get the close of the day prior to first_date. + + We attempt to download data unless we already have data stored at the data + cache for `symbol` whose first entry is before or on `first_date` and whose + last entry is on or after `last_date`. + + If we perform a download and the cache criteria are not satisfied, we wait + at least one hour before attempting a redownload. This is determined by + comparing the current time to the result of os.path.getmtime on the cache + path. + """ + filename = get_benchmark_filename(symbol) + data = _load_cached_data(filename, first_date, last_date, now, 'benchmark', + environ) + if data is not None: + return data + + # If no cached data was found or it was missing any dates then download the + # necessary data. + logger.info( + ('Downloading benchmark data for {symbol!r} ' + 'from {first_date} to {last_date}'), + symbol=symbol, + first_date=first_date - trading_day, + last_date=last_date + ) + + try: + data = get_benchmark_returns( + symbol, + first_date - trading_day, + last_date, + ) + data.to_csv(get_data_filepath(filename, environ)) + except (OSError, IOError, HTTPError): + logger.exception('Failed to cache the new benchmark returns') + raise + if not has_data_for_dates(data, first_date, last_date): + logger.warn("Still don't have expected data after redownload!") + return data def ensure_benchmark_data(symbol, first_date, last_date, now, trading_day, environ=None): diff --git a/catalyst/examples/buy_and_hold.py b/catalyst/examples/buy_and_hold.py index 8b693e66..406229bd 100644 --- a/catalyst/examples/buy_and_hold.py +++ b/catalyst/examples/buy_and_hold.py @@ -13,21 +13,102 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from catalyst.api import order, symbol + +import numpy as np + +from catalyst.api import ( + order, + symbol, + record, +) stocks = ['USDT_BTC'] +TARGET_INVESTMENT_RATIO = 0.1 def initialize(context): context.has_ordered = False - context.stocks = stocks + context.asset = symbol('USDT_BTC') def handle_data(context, data): if not context.has_ordered: - for stock in context.stocks: - order(symbol(stock), 100) - context.has_ordered = True + price = data[context.asset].price + amt = TARGET_INVESTMENT_RATIO * (context.portfolio.cash / price) + if not np.isnan(amt): + print 'amt:', amt + order(context.asset, amt, limit_price=price*1.5) + context.has_ordered = True + + record( + USDT_BTC=data[context.asset].price, + cash=context.portfolio.cash, + leverage=context.account.leverage, + ) + +def analyze(context=None, results=None): + import matplotlib.pyplot as plt + # Plot the portfolio and asset data. + ax1 = plt.subplot(511) + results[['portfolio_value']].plot(ax=ax1) + ax1.set_ylabel('Portfolio value (USD)') + + ax2 = plt.subplot(512, sharex=ax1) + ax2.set_ylabel('USDT_BTC (USD)') + results[['USDT_BTC']].plot(ax=ax2) + + trans = results.ix[[t != [] for t in results.transactions]] + buys = trans.ix[ + [t[0]['amount'] > 0 for t in trans.transactions] + ] + sells = trans.ix[ + [t[0]['amount'] < 0 for t in trans.transactions] + ] + print 'buys:', buys.head() + ax2.plot( + buys.index, results.USDT_BTC[buys.index], + '^', + markersize=10, + color='m', + ) + ax2.plot( + sells.index, results.USDT_BTC[sells.index], + 'v', + markersize=10, + color='k', + ) + + ax3 = plt.subplot(513, sharex=ax1) + results[['leverage', 'alpha', 'beta']].plot(ax=ax3) + ax3.set_ylabel('Leverage (USD)') + + ax4 = plt.subplot(514, sharex=ax1) + results[['cash']].plot(ax=ax4) + ax4.set_ylabel('Cash (USD)') + + results[[ + 'treasury', + 'algorithm', + 'benchmark', + ]] = results[[ + 'treasury_period_return', + 'algorithm_period_return', + 'benchmark_period_return', + ]] + + ax5 = plt.subplot(515, sharex=ax1) + results[[ + 'treasury', + 'algorithm', + 'benchmark', + ]].plot(ax=ax5) + ax5.set_ylabel('Dollars (USD)') + + plt.legend(loc=3) + + # Show the plot. + plt.gcf().set_size_inches(18, 8) + plt.show() def _test_args(): diff --git a/catalyst/examples/dual_moving_average_btc.py b/catalyst/examples/dual_moving_average_btc.py index b0348799..f7b4126c 100644 --- a/catalyst/examples/dual_moving_average_btc.py +++ b/catalyst/examples/dual_moving_average_btc.py @@ -46,7 +46,7 @@ SHORT_WINDOW = 30 LONG_WINDOW = 100 def initialize(context): - context.asset = symbol('USDT_BTC') + context.asset = symbol('USDT_LTC') context.i = 0 set_commission(PerDollar(cost=0.001)) @@ -105,7 +105,7 @@ def rebalance(context, data): order_target_percent(context.asset, 0.0) record( - USDT_BTC=price, + USDT_LTC=price, cash=context.portfolio.cash, leverage=context.account.leverage, short_mavg=short_mavg, @@ -124,8 +124,8 @@ def analyze(context=None, results=None): ax1.set_ylabel('Portfolio value (USD)') ax2 = plt.subplot(512, sharex=ax1) - ax2.set_ylabel('USDT_BTC (USD)') - results[['USDT_BTC', 'short_mavg', 'long_mavg']].plot(ax=ax2) + ax2.set_ylabel('USDT_LTC (USD)') + results[['USDT_LTC', 'short_mavg', 'long_mavg']].plot(ax=ax2) trans = results.ix[[t != [] for t in results.transactions]] buys = trans.ix[ @@ -136,13 +136,13 @@ def analyze(context=None, results=None): ] print 'buys:', buys.head() ax2.plot( - buys.index, results.USDT_BTC[buys.index], + buys.index, results.USDT_LTC[buys.index], '^', markersize=10, color='m', ) ax2.plot( - sells.index, results.USDT_BTC[sells.index], + sells.index, results.USDT_LTC[sells.index], 'v', markersize=10, color='k', diff --git a/catalyst/utils/calendars/exchange_calendar_open.py b/catalyst/utils/calendars/exchange_calendar_open.py index bf5f8ce5..58cd8a58 100644 --- a/catalyst/utils/calendars/exchange_calendar_open.py +++ b/catalyst/utils/calendars/exchange_calendar_open.py @@ -1,10 +1,12 @@ from datetime import time from pytz import timezone -from .trading_calendar import TradingCalendar +from pandas.tseries.offsets import DateOffset from catalyst.utils.memoize import lazyval +from .trading_calendar import TradingCalendar + class OpenExchangeCalendar(TradingCalendar): @property @@ -25,4 +27,4 @@ class OpenExchangeCalendar(TradingCalendar): @lazyval def day(self): - return 'D' + return DateOffset(days=1) diff --git a/catalyst/utils/run_algo.py b/catalyst/utils/run_algo.py index c58e7835..4c60054e 100644 --- a/catalyst/utils/run_algo.py +++ b/catalyst/utils/run_algo.py @@ -13,10 +13,12 @@ try: except: PYGMENTS = False from toolz import valfilter, concatv +from functools import partial from catalyst.algorithm import TradingAlgorithm from catalyst.data.bundles.core import load from catalyst.data.data_portal import DataPortal +from catalyst.data.loader import load_crypto_market_data from catalyst.finance.trading import TradingEnvironment from catalyst.pipeline.data import USEquityPricing, CryptoPricing from catalyst.pipeline.loaders import ( @@ -143,9 +145,17 @@ def _run(handle_data, "invalid url %r, must begin with 'sqlite:///'" % str(bundle_data.asset_finder.engine.url), ) - env = TradingEnvironment(asset_db_path=connstr, environ=environ) + + env = TradingEnvironment( + #load=partial(load_crypto_market_data, environ=environ), + #bm_symbol='USDT_BTC', + asset_db_path=connstr, + environ=environ, + ) + first_trading_day =\ bundle_data.equity_minute_bar_reader.first_trading_day + data = DataPortal( env.asset_finder, get_calendar('NYSE'), diff --git a/curate/crypto_price_generator.py b/curate/crypto_price_generator.py index 4568820c..1e889952 100644 --- a/curate/crypto_price_generator.py +++ b/curate/crypto_price_generator.py @@ -10,7 +10,7 @@ import catalyst.data.bundles.core as bundles DT_START = time.mktime(datetime(2010, 01, 01, 0, 0).timetuple()) # DT_START = time.mktime(datetime(2017, 06, 13, 0, 0).timetuple()) # TODO: remove temp -CSV_OUT_FOLDER = '/var/tmp/catalyst/data/' +CSV_OUT_FOLDER = '/var/tmp/catalyst/data/poloniex/' CONN_RETRIES = 2 logbook.StderrHandler().push_application() @@ -119,26 +119,24 @@ class PoloniexDataGenerator(object): Makes sure data is up to date ''' def to_dataframe(self, start, end, currencyPair=None): - csv_fn = CSV_OUT_FOLDER + 'crypto_prices-' + currencyPair + '.csv' - last_date = self._get_start_date(csv_fn) - if last_date + 300 < end or not os.path.exists(csv_fn): - # get latest data - self.append_data_single_pair(currencyPair) + csv_fn = CSV_OUT_FOLDER + 'crypto_prices-' + currencyPair + '.csv' + last_date = self._get_start_date(csv_fn) + if last_date + 300 < end or not os.path.exists(csv_fn): + # get latest data + self.append_data_single_pair(currencyPair) - # CSV holds the latest snapshot - df = pd.read_csv(csv_fn, names=['date', 'open', 'high', 'low', 'close', 'volume']) - df['date']=pd.to_datetime(df['date'],unit='s') + # CSV holds the latest snapshot + df = pd.read_csv(csv_fn, names=['date', 'open', 'high', 'low', 'close', 'volume']) + df['date']=pd.to_datetime(df['date'],unit='s') df.set_index('date', inplace=True) - #return df.loc[(df.index > start) & (df.index <= end)] + #return df.loc[(df.index > start) & (df.index <= end)] return df[datetime.fromtimestamp(start):datetime.fromtimestamp(end-1)] if __name__ == '__main__': pdg = PoloniexDataGenerator() pdg.get_currency_pairs() pdg.append_data() - df = pdg.to_dataframe(time.mktime(datetime(2017, 6, 01, 0, 0).timetuple()),time.mktime(datetime(2017, 6, 02, 0, 0).timetuple()),'USDT_BTC') - print(df)