From b4b19a75554e6162bd59ce0d2905d6628943c1bf Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 23 Jun 2017 18:44:45 -0700 Subject: [PATCH] USDT_BTC benchmark This commit: * Adds a crypto_benchmark that can create benchmarks for symbols found on POLO * Changes default trading calendars to OPEN * Properly computes daily bar data from five minute POLO bars * Allows trading of one hundredth of a coin, later we plan to integrate per the ratio of a full coin to its base denomination. --- .gitignore | 2 +- catalyst/algorithm.py | 4 +- catalyst/data/bundles/poloniex.py | 151 +++++++++--------- catalyst/data/loader.py | 73 +++++++-- catalyst/examples/buy_and_hold.py | 4 +- catalyst/examples/dual_moving_average_btc.py | 28 +++- .../pipeline/loaders/crypto_pricing_loader.py | 2 +- catalyst/utils/calendars/calendar_utils.py | 2 + .../utils/calendars/exchange_calendar_open.py | 2 +- catalyst/utils/calendars/trading_calendar.py | 2 +- catalyst/utils/factory.py | 2 +- catalyst/utils/run_algo.py | 11 +- 12 files changed, 177 insertions(+), 106 deletions(-) diff --git a/.gitignore b/.gitignore index ac8834e4..7e2c83e8 100644 --- a/.gitignore +++ b/.gitignore @@ -75,6 +75,6 @@ zipline.iml *.pickle # data -data +./data TAGS diff --git a/catalyst/algorithm.py b/catalyst/algorithm.py index 4cb5e05e..a6fdcebf 100644 --- a/catalyst/algorithm.py +++ b/catalyst/algorithm.py @@ -289,7 +289,7 @@ class TradingAlgorithm(object): # If a schedule has been provided, pop it. Otherwise, use NYSE. self.trading_calendar = kwargs.pop( 'trading_calendar', - get_calendar('NYSE') + get_calendar('OPEN') ) self.sim_params = kwargs.pop('sim_params', None) @@ -1115,7 +1115,7 @@ class TradingAlgorithm(object): if calendar is None: cal = self.trading_calendar elif calendar is calendars.CRYPTO_ASSETS: - cal = get_calendar('NYSE') + cal = get_calendar('OPEN') elif calendar is calendars.US_EQUITIES: cal = get_calendar('NYSE') elif calendar is calendars.US_FUTURES: diff --git a/catalyst/data/bundles/poloniex.py b/catalyst/data/bundles/poloniex.py index b4e8a658..500c7cf9 100644 --- a/catalyst/data/bundles/poloniex.py +++ b/catalyst/data/bundles/poloniex.py @@ -84,88 +84,95 @@ def poloniex_cryptoassets(symbols, start=None, end=None): ('symbol', 'object'), ])) + day_offset = pd.Timedelta(days=1) + + def compute_daily_bars(five_min_bars): + # filter and copy the entry at the beginning of each session + daily_bars = five_min_bars[ + five_min_bars.index.isin(calendar.all_sessions) + ].copy() + + # iterate through session starts doing: + # 1. filter five_min_bars to get all entries in one day + # 2. compute daily bar entry + # 3. record in rid-th row of daily_bars + for rid, start_date in enumerate(daily_bars.index): + # compute beginning of next session + end_date = start_date + day_offset + + # filter for entries session entries + day_data = five_min_bars[ + (five_min_bars.index >= start_date) & + (five_min_bars.index < end_date) + ] + + # compute and record daily bar + daily_bars.iloc[rid] = ( + day_data.open.iloc[0], # first open price + day_data.high.max(), # max of high prices + day_data.low.min(), # min of low prices + day_data.close.iloc[-1], # last close price + day_data.volume.sum(), # sum of all volumes + ) + + # scale to allow trading 100-ths of a coin + daily_bars.loc[:, 'open'] /= 100.0 + daily_bars.loc[:, 'high'] /= 100.0 + daily_bars.loc[:, 'low'] /= 100.0 + daily_bars.loc[:, 'close'] /= 100.0 + daily_bars.loc[:, 'volume'] *= 100.0 + + return daily_bars + def _pricing_iter(): sid = 0 - - for symbol in symbols: - #def to_dataframe(self, start, end, currencyPair=None): - csv_fn = '/var/tmp/catalyst/data/poloniex/crypto_prices-' + symbol + '.csv' # TODO: DIR as parameter - #last_date = self._get_start_date(csv_fn) - #if last_date + 300 < end or not os.path.exists(csv_fn): - # get latest data - #self.append_data_single_pair(currencyPair) - - # CSV holds the latest snapshot - data = pd.read_csv(csv_fn, names=['date', 'open', 'high', 'low', 'close', 'volume']) - data['date'] = pd.to_datetime(data['date'], utc=True, unit='s') - data.set_index('date', inplace=True) - - #df = df.resample('D').mean() - df = data.loc[data.index.isin(calendar.schedule.index)] - - offset = DateOffset(days=1) - for start_date in df.index: - end_date = start_date + offset - day_data = data[start_date:end_date] - - df[start_date]['open'] = day_data[0]['open'] - df[start_date]['high'] = day_data['high'].max() - df[start_date]['low'] = day_data['low'].min() - df[start_date]['close'] = day_data[-1]['close'] - df[start_date]['volume'] = day_data['volume'].sum() - - # the start date is the date of the first trade and - # the end date is the date of the last trade - start_date = df.index[0] - end_date = df.index[-1] - # The auto_close date is the day after the last trade. - ac_date = end_date + pd.Timedelta(days=1) - metadata.iloc[sid] = start_date, end_date, ac_date, symbol - - yield sid, df - sid += 1 - - ''' + print 'Ingesting symbols: {0}'.format(symbols) with maybe_show_progress( - symbols, - show_progress, - label='Downloading Yahoo pricing data: ') as it, \ - requests.Session() as session: + symbols, + show_progress, + show_percent=True, + item_show_func=lambda s: 'building {0}'.format(s) + if s is not None + else 'DONE', + info_sep=' | ', + label='Compiling daily bar pricing datasets:', + ) as it: + for symbol in it: - path = _cachpath(symbol, 'ohlcv') - try: - df = cache[path] - except KeyError: - df = cache[path] = DataReader( - symbol, - 'yahoo', - start, - end, - session=session, - ).sort_index() + #def to_dataframe(self, start, end, currencyPair=None): + csv_fn = '/var/tmp/catalyst/data/poloniex/crypto_prices-' +\ + symbol + '.csv' + + #last_date = self._get_start_date(csv_fn) + #if last_date + 300 < end or not os.path.exists(csv_fn): + # get latest data + #self.append_data_single_pair(currencyPair) + + # CSV holds the latest snapshot + columns = ['date', 'open', 'high', 'low', 'close', 'volume'] + five_min_bars = pd.read_csv(csv_fn, names=columns) + five_min_bars.set_index('date', inplace=True) + five_min_bars.index = pd.to_datetime( + five_min_bars.index, + utc=True, + unit='s', + ) + + daily_bars = compute_daily_bars(five_min_bars) # the start date is the date of the first trade and # the end date is the date of the last trade - start_date = df.index[0] - end_date = df.index[-1] + start_date = daily_bars.index[0].tz_localize(None) + end_date = daily_bars.index[-1].tz_localize(None) # The auto_close date is the day after the last trade. - ac_date = end_date + pd.Timedelta(days=1) + ac_date = end_date + day_offset metadata.iloc[sid] = start_date, end_date, ac_date, symbol - df.rename( - columns={ - 'Open': 'open', - 'High': 'high', - 'Low': 'low', - 'Close': 'close', - 'Volume': 'volume', - }, - inplace=True, - ) - yield sid, df + yield sid, daily_bars sid += 1 - ''' - daily_bar_writer.write(_pricing_iter(), show_progress=show_progress) + + + daily_bar_writer.write(_pricing_iter()) symbol_map = pd.Series(metadata.symbol.index, metadata.symbol) @@ -178,7 +185,7 @@ def poloniex_cryptoassets(symbols, start=None, end=None): adjustment_writer.write() return ingest - + # bundle used when creating test data register( diff --git a/catalyst/data/loader.py b/catalyst/data/loader.py index 4c4fc94e..18cb445b 100644 --- a/catalyst/data/loader.py +++ b/catalyst/data/loader.py @@ -17,6 +17,7 @@ from collections import OrderedDict import logbook import pandas as pd +import numpy as np from pandas_datareader.data import DataReader import datetime import pytz @@ -253,7 +254,6 @@ def ensure_crypto_benchmark_data(symbol, first_date, last_date, now, if data is not None: - print 'benchmark data:\n', data.head() return data # If no cached data was found or it was missing any dates then download the @@ -269,33 +269,82 @@ def ensure_crypto_benchmark_data(symbol, first_date, last_date, now, def dateparse(time_in_secs): return datetime.datetime.fromtimestamp(float(time_in_secs), pytz.utc) + def compute_daily_bars(five_min_bars, schedule): + # filter and copy the entry at the beginning of each session + daily_bars = five_min_bars[ + five_min_bars.index.isin(schedule) + ].copy() + + day_offset = pd.Timedelta(days=1) + + # iterate through session starts doing: + # 1. filter five_min_bars to get all entries in one day + # 2. compute daily bar entry + # 3. record in rid-th row of daily_bars + for rid, start_date in enumerate(daily_bars.index): + # compute beginning of next session + end_date = start_date + day_offset + + # filter for entries session entries + day_data = five_min_bars[ + (five_min_bars.index >= start_date) & + (five_min_bars.index < end_date) + ] + + # compute and record daily bar + daily_bars.iloc[rid] = ( + day_data.open.iloc[0], # first open price + day_data.high.max(), # max of high prices + day_data.low.min(), # min of low prices + day_data.close.iloc[-1], # last close prices + day_data.volume.sum(), # sum of all volumes + ) + + # scale to allow trading 100-ths of a coin + daily_bars.loc[:, 'open'] /= 100.0 + daily_bars.loc[:, 'high'] /= 100.0 + daily_bars.loc[:, 'low'] /= 100.0 + daily_bars.loc[:, 'close'] /= 100.0 + daily_bars.loc[:, 'volume'] *= 100.0 + + return daily_bars + try: - data = pd.read_csv( + # load five minute bars from csv cache + five_min_bars = pd.read_csv( source_filename, names=['date', 'open', 'high', 'low', 'close', 'volume'], index_col=[0], parse_dates=True, date_parser=dateparse, ) - data = data[['close']] + five_min_bars.index = pd.to_datetime(five_min_bars.index, utc=True, unit='s') - print 'loaded benchmark data:\n', data.index + # compute daily bars for open calendar + open_calendar = get_calendar('OPEN') + daily_bars = compute_daily_bars( + five_min_bars, + open_calendar.all_sessions, + ) - data = data[ - (data.index >= (first_date-trading_day)) & - (data.index <= last_date) + # filter daily bars to include first_date and last_date + daily_bars = daily_bars[ + (daily_bars.index >= (first_date - trading_day)) & + (daily_bars.index <= last_date) ] - data = data.pct_change(1).iloc[1:] - print 'writing benchmark data:\n', data.head() + # select close column and compute percent change between days + daily_close = daily_bars[['close']] + daily_close = daily_close.pct_change(1).iloc[1:] - data.to_csv(get_data_filepath(filename, environ)) + # write to benchmark csv cache + daily_close.to_csv(get_data_filepath(filename, environ)) except (OSError, IOError, HTTPError): logger.exception('Failed to cache the new benchmark returns') raise - if not has_data_for_dates(data, first_date, last_date): + if not has_data_for_dates(daily_close, first_date, last_date): logger.warn("Still don't have expected data after redownload!") - return data + return daily_close def ensure_benchmark_data(symbol, first_date, last_date, now, trading_day, diff --git a/catalyst/examples/buy_and_hold.py b/catalyst/examples/buy_and_hold.py index 406229bd..21eb2224 100644 --- a/catalyst/examples/buy_and_hold.py +++ b/catalyst/examples/buy_and_hold.py @@ -22,9 +22,7 @@ from catalyst.api import ( record, ) -stocks = ['USDT_BTC'] - -TARGET_INVESTMENT_RATIO = 0.1 +TARGET_INVESTMENT_RATIO = 0.5 def initialize(context): context.has_ordered = False diff --git a/catalyst/examples/dual_moving_average_btc.py b/catalyst/examples/dual_moving_average_btc.py index f7b4126c..66a645fb 100644 --- a/catalyst/examples/dual_moving_average_btc.py +++ b/catalyst/examples/dual_moving_average_btc.py @@ -46,7 +46,7 @@ SHORT_WINDOW = 30 LONG_WINDOW = 100 def initialize(context): - context.asset = symbol('USDT_LTC') + context.asset = symbol('USDT_BTC') context.i = 0 set_commission(PerDollar(cost=0.001)) @@ -100,12 +100,22 @@ def rebalance(context, data): if data.can_trade(context.asset): # adjust portfolio based on moving averages if short_mavg > long_mavg: - order_target_percent(context.asset, TARGET_INVESTMENT_RATIO) + order_target_percent( + context.asset, + TARGET_INVESTMENT_RATIO, + #limit_price=(2 * price), + #stop_price=(0.5 * price), + ) elif short_mavg < long_mavg: - order_target_percent(context.asset, 0.0) + order_target_percent( + context.asset, + 0.0, + #limit_price=(2 * price), + #stop_price=(0.5 * price), + ) record( - USDT_LTC=price, + USDT_BTC=price, cash=context.portfolio.cash, leverage=context.account.leverage, short_mavg=short_mavg, @@ -124,10 +134,12 @@ def analyze(context=None, results=None): ax1.set_ylabel('Portfolio value (USD)') ax2 = plt.subplot(512, sharex=ax1) - ax2.set_ylabel('USDT_LTC (USD)') - results[['USDT_LTC', 'short_mavg', 'long_mavg']].plot(ax=ax2) + ax2.set_ylabel('USDT_BTC (USD)') + results[['USDT_BTC', 'short_mavg', 'long_mavg']].plot(ax=ax2) trans = results.ix[[t != [] for t in results.transactions]] + amounts = [t[0]['amount'] for t in trans.transactions] + print 'amounts:\n', amounts buys = trans.ix[ [t[0]['amount'] > 0 for t in trans.transactions] ] @@ -136,13 +148,13 @@ def analyze(context=None, results=None): ] print 'buys:', buys.head() ax2.plot( - buys.index, results.USDT_LTC[buys.index], + buys.index, results.USDT_BTC[buys.index], '^', markersize=10, color='m', ) ax2.plot( - sells.index, results.USDT_LTC[sells.index], + sells.index, results.USDT_BTC[sells.index], 'v', markersize=10, color='k', diff --git a/catalyst/pipeline/loaders/crypto_pricing_loader.py b/catalyst/pipeline/loaders/crypto_pricing_loader.py index 32fa3c14..f8676cce 100644 --- a/catalyst/pipeline/loaders/crypto_pricing_loader.py +++ b/catalyst/pipeline/loaders/crypto_pricing_loader.py @@ -37,7 +37,7 @@ class CryptoPricingLoader(PipelineLoader): self.raw_price_loader = raw_price_loader self._columns = dataset.columns - cal = get_calendar('NYSE') + cal = get_calendar('OPEN') self._all_sessions = cal.all_sessions diff --git a/catalyst/utils/calendars/calendar_utils.py b/catalyst/utils/calendars/calendar_utils.py index 0e4824a7..57d97f2d 100644 --- a/catalyst/utils/calendars/calendar_utils.py +++ b/catalyst/utils/calendars/calendar_utils.py @@ -73,6 +73,8 @@ class TradingCalendarDispatcher(object): """ canonical_name = self.resolve_alias(name) + print 'get_calendar:', canonical_name + try: return self._calendars[canonical_name] except KeyError: diff --git a/catalyst/utils/calendars/exchange_calendar_open.py b/catalyst/utils/calendars/exchange_calendar_open.py index 58cd8a58..cf62b8f0 100644 --- a/catalyst/utils/calendars/exchange_calendar_open.py +++ b/catalyst/utils/calendars/exchange_calendar_open.py @@ -15,7 +15,7 @@ class OpenExchangeCalendar(TradingCalendar): @property def tz(self): - return timezone('US/Eastern') + return timezone('UTC') @property def open_time(self): diff --git a/catalyst/utils/calendars/trading_calendar.py b/catalyst/utils/calendars/trading_calendar.py index d30eec33..cccb5f28 100644 --- a/catalyst/utils/calendars/trading_calendar.py +++ b/catalyst/utils/calendars/trading_calendar.py @@ -100,7 +100,7 @@ class TradingCalendar(with_metaclass(ABCMeta)): 'market_open': self._opens, 'market_close': self._closes, }, - dtype='datetime64[ns]', + dtype='datetime64[ns, UTC]', ) # Simple cache to avoid recalculating the same minute -> session in diff --git a/catalyst/utils/factory.py b/catalyst/utils/factory.py index 1009e23c..ae86c99e 100644 --- a/catalyst/utils/factory.py +++ b/catalyst/utils/factory.py @@ -45,7 +45,7 @@ def create_simulation_parameters(year=2006, start=None, end=None, trading_calendar=None): if not trading_calendar: - trading_calendar = get_calendar("NYSE") + trading_calendar = get_calendar("OPEN") if start is None: start = pd.Timestamp("{0}-01-01".format(year), tz='UTC') diff --git a/catalyst/utils/run_algo.py b/catalyst/utils/run_algo.py index 4c60054e..d4b75207 100644 --- a/catalyst/utils/run_algo.py +++ b/catalyst/utils/run_algo.py @@ -123,7 +123,7 @@ def _run(handle_data, def get_trading_env_and_data(bundles): env = data = None - b = 'catalyst' + b = 'poloniex' if len(bundles) == 0: return env, data elif len(bundles) == 1: @@ -146,9 +146,12 @@ def _run(handle_data, str(bundle_data.asset_finder.engine.url), ) + open_calendar = get_calendar('OPEN') + env = TradingEnvironment( - #load=partial(load_crypto_market_data, environ=environ), - #bm_symbol='USDT_BTC', + load=partial(load_crypto_market_data, environ=environ), + bm_symbol='USDT_BTC', + trading_calendar=open_calendar, asset_db_path=connstr, environ=environ, ) @@ -158,7 +161,7 @@ def _run(handle_data, data = DataPortal( env.asset_finder, - get_calendar('NYSE'), + open_calendar, first_trading_day=first_trading_day, equity_minute_reader=bundle_data.equity_minute_bar_reader, equity_daily_reader=bundle_data.equity_daily_bar_reader,