From 3394614ecfc537e12100f01930f6b113928c9cf7 Mon Sep 17 00:00:00 2001 From: fredfortier Date: Mon, 30 Oct 2017 21:17:53 -0400 Subject: [PATCH] BUG: Fixes issue #47. Made improvements around auto-ingestion. --- catalyst/exchange/bundle_utils.py | 49 +++- catalyst/exchange/exchange_bundle.py | 264 ++++++++++++---------- catalyst/exchange/exchange_data_portal.py | 26 ++- catalyst/exchange/stats_utils.py | 40 ++++ catalyst/support/issue_47.py | 140 ++++++++++++ tests/exchange/test_bundle.py | 65 +++++- 6 files changed, 446 insertions(+), 138 deletions(-) create mode 100644 catalyst/support/issue_47.py diff --git a/catalyst/exchange/bundle_utils.py b/catalyst/exchange/bundle_utils.py index acd2cc4b..9a0eb3c2 100644 --- a/catalyst/exchange/bundle_utils.py +++ b/catalyst/exchange/bundle_utils.py @@ -103,34 +103,59 @@ def get_start_dt(end_dt, bar_count, data_frequency): return start_dt -def get_month_start_end(dt): +def get_period_label(dt, data_frequency): """ - Returns the first and last day of the month for the specified date. + The period label for the specified date and frequency. :param dt: + :param data_frequency: + :return: + """ + return '{}-{:02d}'.format(dt.year, dt.month) if data_frequency == 'minute' \ + else '{}'.format(dt.year) + + +def get_month_start_end(dt, first_day=None, last_day=None): + """ + The first and last day of the month for the specified date. + + :param dt: + :param first_day + :param last_day :return: """ month_range = calendar.monthrange(dt.year, dt.month) - month_start = pd.to_datetime(datetime( - dt.year, dt.month, 1, 0, 0, 0, 0 - ), utc=True) - month_end = pd.to_datetime(datetime( - dt.year, dt.month, month_range[1], 23, 59, 0, 0 - ), utc=True) + if first_day: + month_start = first_day + else: + month_start = pd.to_datetime(datetime( + dt.year, dt.month, 1, 0, 0, 0, 0 + ), utc=True) + + if last_day: + month_end = last_day + else: + month_end = pd.to_datetime(datetime( + dt.year, dt.month, month_range[1], 23, 59, 0, 0 + ), utc=True) return month_start, month_end -def get_year_start_end(dt): +def get_year_start_end(dt, first_day=None, last_day=None): """ - Returns the first and last day of the year for the specified date. + The first and last day of the year for the specified date. :param dt: + :param first_day + :param last_day :return: """ - year_start = pd.to_datetime(date(dt.year, 1, 1), utc=True) - year_end = pd.to_datetime(date(dt.year, 12, 31), utc=True) + year_start = first_day if first_day \ + else pd.to_datetime(date(dt.year, 1, 1), utc=True) + year_end = last_day if last_day \ + else pd.to_datetime(date(dt.year, 12, 31), utc=True) return year_start, year_end diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index 353c5fc9..ec499334 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -1,9 +1,14 @@ import os +import os import shutil -from datetime import timedelta +from itertools import chain import pandas as pd +from catalyst.assets._assets import TradingPair from logbook import Logger +from pandas.tslib import Timestamp +from pytz import UTC +from six import itervalues from catalyst import get_calendar from catalyst.constants import LOG_LEVEL @@ -11,11 +16,11 @@ from catalyst.data.minute_bars import BcolzMinuteOverlappingData, \ BcolzMinuteBarMetadata from catalyst.exchange.bundle_utils import range_in_bundle, \ get_bcolz_chunk, get_delta, get_month_start_end, \ - get_year_start_end, get_df_from_arrays, get_start_dt + get_year_start_end, get_df_from_arrays, get_start_dt, get_period_label from catalyst.exchange.exchange_bcolz import BcolzExchangeBarReader, \ BcolzExchangeBarWriter from catalyst.exchange.exchange_errors import EmptyValuesInBundleError, \ - InvalidHistoryFrequencyError, TempBundleNotFoundError, \ + TempBundleNotFoundError, \ NoDataAvailableOnExchange, \ PricingDataNotLoadedError from catalyst.exchange.exchange_utils import get_exchange_folder @@ -383,7 +388,7 @@ class ExchangeBundle: """ reader = self.get_reader(data_frequency) - chunks = [] + chunks = dict() for asset in assets: try: # Checking if the the asset has price data in the specified @@ -397,98 +402,57 @@ class ExchangeBundle: log.debug('skipping {}: {}'.format(asset.symbol, e)) continue - # This is either the first trading day of the asset or the - # first session available in the calendar - first_trading_dt = asset.start_date \ - if asset.start_date > self.calendar.first_session \ - else self.calendar.first_session + dates = pd.date_range( + start=get_period_label(adj_start, data_frequency), + end=get_period_label(adj_end, data_frequency), + freq='MS' if data_frequency == 'minute' else 'AS', + tz=UTC + ) - # Aligning start / end dates with the daily calendar - sessions = self.calendar.sessions_in_range(adj_start, adj_end) + # Adjusting the last date of the range to avoid + # going over the asset's trading bounds + dates.values[0] = adj_start + dates.values[-1] = adj_end - # We loop through each session to create chunks for each period - chunk_labels = [] - dt = sessions[0] - while dt <= sessions[-1]: - label = '{}-{:02d}'.format(dt.year, dt.month) \ - if data_frequency == 'minute' else '{}'.format(dt.year) + chunks[asset] = [] + for index, dt in enumerate(dates): + get_start_end = get_month_start_end \ + if data_frequency == 'minute' else get_year_start_end - if label not in chunk_labels: - chunk_labels.append(label) + period_start, period_end = get_start_end( + dt=dt, + first_day=dt if index == 0 else None, + last_day=dt if index == len(dates) - 1 else None + ) - # Adjusting the period dates to match the availability - # of the trading pair - if data_frequency == 'minute': - period_start, period_end = get_month_start_end(dt) + # Currencies don't always start trading at midnight. + # Checking the last minute of the day instead. + range_start = period_start.replace(hour=23, minute=59) \ + if data_frequency == 'minute' else period_start - asset_start_month, _ = get_month_start_end( - first_trading_dt + # Checking if the data already exists in the bundle + # for the date range of the chunk. If not, we create + # a chunk for ingestion. + has_data = range_in_bundle( + asset, range_start, period_end, reader + ) + if not has_data: + chunks[asset].append( + dict( + asset=asset, + period_start=period_start, + period_end=period_end, + period=get_period_label(dt, data_frequency) ) - if asset_start_month == period_start \ - and period_start < first_trading_dt: - period_start = first_trading_dt - - # TODO: need to filter closed pairs? - _, asset_end_month = get_month_start_end( - asset.end_minute - ) - if asset_end_month == period_end \ - and period_end > asset.end_minute: - period_end = asset.end_minute - - elif data_frequency == 'daily': - period_start, period_end = get_year_start_end(dt) - - asset_start_year, _ = get_year_start_end( - first_trading_dt - ) - if asset_start_year == period_start \ - and period_start < first_trading_dt: - period_start = first_trading_dt - - _, asset_end_year = get_year_start_end( - asset.end_daily - ) - if asset_end_year == period_end \ - and period_end > asset.end_daily: - period_end = asset.end_daily - - else: - raise InvalidHistoryFrequencyError( - frequency=data_frequency - ) - - # Currencies don't always start trading at midnight. - # Checking the last minute of the day instead. - range_start = period_start.replace(hour=23, minute=59) \ - if data_frequency == 'minute' else period_start - - # Checking if the data already exists in the bundle - # for the date range of the chunk. If not, we create - # a chunk for ingestion. - has_data = range_in_bundle( - asset, range_start, period_end, reader ) - if not has_data: - log.debug('adding period: {}'.format(label)) - chunks.append( - dict( - asset=asset, - period_start=period_start, - period_end=period_end, - period=label - ) - ) - dt += timedelta(days=1) - - # We sort the chunks by end date to ingest most recent data first - chunks.sort(key=lambda chunk: chunk['period_end']) + # We sort the chunks by end date to ingest most recent data first + chunks[asset].sort(key=lambda chunk: chunk['period_end']) return chunks - def ingest_assets(self, assets, start_dt, end_dt, data_frequency, - show_progress=False): + def ingest_assets(self, assets, data_frequency, start_dt=None, end_dt=None, + show_progress=False, asset_chunks=False): """ Determine if data is missing from the bundle and attempt to ingest it. @@ -497,6 +461,16 @@ class ExchangeBundle: :param end_dt: :return: """ + + if start_dt is None: + start_dt = self.calendar.first_session + + if end_dt is None: + end_dt = pd.Timestamp.utcnow() + + start_dt, end_dt = self.get_adj_dates( + start_dt, end_dt, assets, data_frequency + ) chunks = self.prepare_chunks( assets=assets, data_frequency=data_frequency, @@ -507,7 +481,8 @@ class ExchangeBundle: # Since chunks are either monthly or yearly, it is possible that # our ingestion data range is greater than specified. We adjust # the boundaries to ensure that the writer can write all data. - for chunk in chunks: + all_chunks = list(chain.from_iterable(itervalues(chunks))) + for chunk in all_chunks: if chunk['period_start'] < start_dt: start_dt = chunk['period_start'] @@ -515,24 +490,49 @@ class ExchangeBundle: end_dt = chunk['period_end'] writer = self.get_writer(start_dt, end_dt, data_frequency) - with maybe_show_progress( - chunks, - show_progress, - label='Fetching {exchange} {frequency} candles: '.format( - exchange=self.exchange.name, - frequency=data_frequency - )) as it: - for chunk in it: - self.ingest_ctable( - asset=chunk['asset'], - data_frequency=data_frequency, - period=chunk['period'], - start_dt=chunk['period_start'], - end_dt=chunk['period_end'], - writer=writer, - empty_rows_behavior='strip', - cleanup=True - ) + + if asset_chunks: + for asset in chunks: + with maybe_show_progress( + chunks[asset], + show_progress, + label='Ingesting {frequency} price data for ' + '{symbol} on {exchange}'.format( + exchange=self.exchange.name, + frequency=data_frequency, + symbol=asset.symbol + )) as it: + for chunk in it: + self.ingest_ctable( + asset=chunk['asset'], + data_frequency=data_frequency, + period=chunk['period'], + start_dt=chunk['period_start'], + end_dt=chunk['period_end'], + writer=writer, + empty_rows_behavior='strip', + cleanup=True + ) + else: + with maybe_show_progress( + all_chunks, + show_progress, + label='Ingesting {frequency} price data on ' + '{exchange}'.format( + exchange=self.exchange.name, + frequency=data_frequency, + )) as it: + for chunk in it: + self.ingest_ctable( + asset=chunk['asset'], + data_frequency=data_frequency, + period=chunk['period'], + start_dt=chunk['period_start'], + end_dt=chunk['period_end'], + writer=writer, + empty_rows_behavior='strip', + cleanup=True + ) def ingest(self, data_frequency, include_symbols=None, exclude_symbols=None, start=None, end=None, @@ -549,20 +549,30 @@ class ExchangeBundle: :return: """ assets = self.get_assets(include_symbols, exclude_symbols) - start_dt, end_dt = self.get_adj_dates( - start, end, assets, data_frequency - ) for frequency in data_frequency.split(','): - self.ingest_assets(assets, start_dt, end_dt, frequency, + self.ingest_assets(assets, frequency, start, end, show_progress) def get_history_window_series_and_load(self, - assets, - end_dt, - bar_count, - field, - data_frequency): + assets, # type: List[TradingPair] + end_dt, # type: Timestamp + bar_count, # type: int + field, # type: str + data_frequency, # type: str + algo_end_dt=None # type: Timestamp + ): + # type: (...) -> Dict[str, Series] + """ + Retrieve price data history, ingest missing data. + + :param assets: + :param end_dt: + :param bar_count: + :param field: + :param data_frequency: + :return: + """ try: series = self.get_history_window_series( assets=assets, @@ -586,9 +596,10 @@ class ExchangeBundle: self.ingest_assets( assets=assets, start_dt=start_dt, - end_dt=end_dt, + end_dt=algo_end_dt, data_frequency=data_frequency, - show_progress=True + show_progress=True, + asset_chunks=True ) series = self.get_history_window_series( assets=assets, @@ -596,12 +607,29 @@ class ExchangeBundle: bar_count=bar_count, field=field, data_frequency=data_frequency, - reset_reader=True + reset_reader=False ) return series - def get_spot_values(self, assets, field, dt, data_frequency, - reset_reader=False): + def get_spot_values(self, + assets, # type: List[TradingPair] + field, # type: str + dt, # type: Timestamp + data_frequency, # type: str + reset_reader=False # type: bool + ): + # type: (...) -> List[float] + """ + The spot values for the gives assets, field and date. Reads from + the exchange data bundle. + + :param assets: + :param field: + :param dt: + :param data_frequency: + :param reset_reader: + :return: + """ values = [] try: reader = self.get_reader(data_frequency) diff --git a/catalyst/exchange/exchange_data_portal.py b/catalyst/exchange/exchange_data_portal.py index 3371c584..8d2cf997 100644 --- a/catalyst/exchange/exchange_data_portal.py +++ b/catalyst/exchange/exchange_data_portal.py @@ -301,7 +301,7 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase): :param ffill: :return: """ - bundle = self.exchange_bundles[exchange.name] + bundle = self.exchange_bundles[exchange.name] # type: ExchangeBundle candle_size, unit, data_frequency = get_frequency( frequency, data_frequency @@ -313,14 +313,32 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase): end_dt=end_dt, bar_count=adj_bar_count, field=field, - data_frequency=data_frequency + data_frequency=data_frequency, + algo_end_dt=self._last_available_session, ) df = resample_history_df(pd.DataFrame(series), candle_size, field) return df - def get_exchange_spot_value(self, exchange, assets, field, dt, - data_frequency): + def get_exchange_spot_value(self, + exchange, # type: Exchange + assets, # type: List[TradingPair] + field, # type: str + dt, # type: Timestamp + data_frequency # type: str + ): + # type: (...) -> float + """ + A spot value for the exchange bundle. Try to ingest data if not in + the bundle. + + :param exchange: + :param assets: + :param field: + :param dt: + :param data_frequency: + :return: + """ bundle = self.exchange_bundles[exchange.name] if data_frequency == 'daily': diff --git a/catalyst/exchange/stats_utils.py b/catalyst/exchange/stats_utils.py index 602694d6..bd968dfe 100644 --- a/catalyst/exchange/stats_utils.py +++ b/catalyst/exchange/stats_utils.py @@ -1,4 +1,44 @@ import pandas as pd +import numpy as np + + +def crossover(source, target): + """ + The `x`-series is defined as having crossed over `y`-series if the value + of `x` is greater than the value of `y` and the value of `x` was less than + the value of `y` on the bar immediately preceding the current bar. + + :param source: + :param target: + :return: + """ + if source[-1] is np.nan or source[-2] is np.nan \ + or target[-1] is np.nan or target[-2] is np.nan: + return False + + if source[-1] > target[-1] and source[-2] < target[-2]: + return True + else: + return False + + +def crossunder(source, target): + """ + The `x`-series is defined as having crossed under `y`-series if the value + of `x` is less than the value of `y` and the value of `x` was greater than + the value of `y` on the bar immediately preceding the current bar. + :param source: + :param target: + :return: + """ + if source[-1] is np.nan or source[-2] is np.nan \ + or target[-1] is np.nan or target[-2] is np.nan: + return False + + if source[-1] < target[-1] and source[-2] > target[-2]: + return True + else: + return False def get_pretty_stats(stats_df, recorded_cols=None, num_rows=10): diff --git a/catalyst/support/issue_47.py b/catalyst/support/issue_47.py new file mode 100644 index 00000000..5341dd03 --- /dev/null +++ b/catalyst/support/issue_47.py @@ -0,0 +1,140 @@ +""" +Requires Catalyst version 0.3.0 or above +Tested on Catalyst version 0.3.2 + +These example aims to provide and easy way for users to learn how to collect data from the different exchanges. +You simply need to specify the exchange and the market that you want to focus on. +You will all see how to create a universe and filter it base on the exchange and the market you desire. + +The example prints out the closing price of all the pairs for a given market-exchange every 30 minutes. +The example also contains the ohlcv minute data for the past seven days which could be used to create indicators +Use this as the backbone to create your own trading strategies. + +Variables lookback date and date are used to ensure data for a coin existed on the lookback period specified. +""" + +import numpy as np +import pandas as pd +from datetime import timedelta +from catalyst import run_algorithm +from catalyst.exchange.exchange_utils import get_exchange_symbols + +from catalyst.api import ( + symbols, +) + + +def initialize(context): + context.i = -1 # counts the minutes + context.exchange = 'poloniex' # must match the exchange specified in run_algorithm + context.base_currency = 'eth' # must match the base currency specified in run_algorithm + + +def handle_data(context, data): + lookback = 60 * 24 * 7 # (minutes, hours, days) of how far to lookback in the data history + context.i += 1 + + # current date formatted into a string + today = context.blotter.current_dt + date, time = today.strftime('%Y-%m-%d %H:%M:%S').split(' ') + lookback_date = today - timedelta(days=( + lookback / (60 * 24))) # subtract the amount of days specified in lookback + lookback_date = lookback_date.strftime('%Y-%m-%d %H:%M:%S').split(' ')[ + 0] # get only the date as a string + + # update universe everyday + new_day = 60 * 24 + if not context.i % new_day: + context.universe = universe(context, lookback_date, date) + + # get data every 30 minutes + minutes = 30 + if not context.i % minutes and context.universe: + # we iterate for every pair in the current universe + for coin in context.coins: + pair = str(coin.symbol) + + # 30 minute interval ohlcv data (the standard data required for candlestick or indicators/signals) + # 30T means 30 minutes re-sampling of one minute data. change to your desire time interval. + open = fill(data.history(coin, 'open', bar_count=lookback, + frequency='1m')).resample('30T').first() + high = fill(data.history(coin, 'high', bar_count=lookback, + frequency='1m')).resample('30T').max() + low = fill(data.history(coin, 'low', bar_count=lookback, + frequency='1m')).resample('30T').min() + close = fill(data.history(coin, 'price', bar_count=lookback, + frequency='1m')).resample('30T').last() + volume = fill(data.history(coin, 'volume', bar_count=lookback, + frequency='1m')).resample('30T').sum() + + # close[-1] is the equivalent to current price + # displays the minute price for each pair every 30 minutes + print( + today, pair, open[-1], high[-1], low[-1], close[-1], volume[-1]) + + # ---------------------------------------------------------------------------------------------------------- + # -------------------------------------- Insert Your Strategy Here ----------------------------------------- + # ---------------------------------------------------------------------------------------------------------- + + +def analyze(context=None, results=None): + pass + + +# Get the universe for a given exchange and a given base_currency market +# Example: Poloniex BTC Market +def universe(context, lookback_date, current_date): + json_symbols = get_exchange_symbols( + context.exchange) # get all the pairs for the exchange + universe_df = pd.DataFrame.from_dict(json_symbols).transpose().astype( + str) # convert into a dataframe + universe_df['base_currency'] = universe_df.apply( + lambda row: row.symbol.split('_')[1], + axis=1) + universe_df['market_currency'] = universe_df.apply( + lambda row: row.symbol.split('_')[0], + axis=1) + # Filter all the exchange pairs to only the ones for a give base currency + universe_df = universe_df[ + universe_df['base_currency'] == context.base_currency] + + # Filter all the pairs to ensure that pair existed in the current date range + universe_df = universe_df[universe_df.start_date < lookback_date] + universe_df = universe_df[universe_df.end_daily >= current_date] + context.coins = symbols( + *universe_df.symbol) # convert all the pairs to symbols + print(universe_df.head(), len(universe_df)) + return universe_df.symbol.tolist() + + +# Replace all NA, NAN or infinite values with its nearest value +def fill(series): + if isinstance(series, pd.Series): + return series.replace([np.inf, -np.inf], np.nan).ffill().bfill() + elif isinstance(series, np.ndarray): + return pd.Series(series).replace([np.inf, -np.inf], + np.nan).ffill().bfill().values + else: + return series + + +if __name__ == '__main__': + start_date = pd.to_datetime('2017-01-01', utc=True) + end_date = pd.to_datetime('2017-10-15', utc=True) + + performance = run_algorithm(start=start_date, end=end_date, + capital_base=10000.0, + initialize=initialize, + handle_data=handle_data, + analyze=analyze, + exchange_name='poloniex', + data_frequency='minute', + base_currency='eth', + live=False, + live_graph=False, + algo_namespace='simple_universe') + +""" +Run in Terminal (inside catalyst environment): +python simple_universe.py +""" diff --git a/tests/exchange/test_bundle.py b/tests/exchange/test_bundle.py index 69b31400..060592ec 100644 --- a/tests/exchange/test_bundle.py +++ b/tests/exchange/test_bundle.py @@ -1,17 +1,20 @@ import hashlib +import tempfile from logging import getLogger +import os import pandas as pd from catalyst import get_calendar from catalyst.exchange.bundle_utils import get_bcolz_chunk, \ - get_periods_range, get_start_dt + get_periods_range, get_start_dt, get_month_start_end, get_df_from_arrays, \ + get_year_start_end from catalyst.exchange.exchange_bcolz import BcolzExchangeBarReader, \ BcolzExchangeBarWriter from catalyst.exchange.exchange_bundle import ExchangeBundle, \ BUNDLE_NAME_TEMPLATE from catalyst.exchange.exchange_utils import get_exchange_folder -from catalyst.exchange.init_utils import get_exchange +from catalyst.exchange.factory import get_exchange from catalyst.exchange.stats_utils import df_to_string from catalyst.utils.paths import ensure_directory @@ -45,11 +48,11 @@ class TestExchangeBundle: exchange = get_exchange(exchange_name) exchange_bundle = ExchangeBundle(exchange) assets = [ - exchange.get_asset('iot_btc') + exchange.get_asset('xmr_btc') ] # start = pd.to_datetime('2017-09-01', utc=True) - start = pd.to_datetime('2017-9-01', utc=True) + start = pd.to_datetime('2016-01-01', utc=True) end = pd.to_datetime('2017-9-30', utc=True) log.info('ingesting exchange bundle {}'.format(exchange_name)) @@ -426,3 +429,57 @@ class TestExchangeBundle: df = pd.DataFrame(bundle_series) print('\n' + df_to_string(df)) pass + + def bundle_to_csv(self): + exchange_name = 'poloniex' + data_frequency = 'daily' + period = '2016' + + exchange = get_exchange(exchange_name) + bundle = ExchangeBundle(exchange) + asset = exchange.get_asset('xmr_btc') + + path = get_bcolz_chunk( + exchange_name=exchange.name, + symbol=asset.symbol, + data_frequency=data_frequency, + period=period + ) + + dt = pd.to_datetime(period, utc=True) + if data_frequency == 'minute': + start_dt, end_dt = get_month_start_end(dt) + else: + start_dt, end_dt = get_year_start_end(dt) + + reader = bundle.get_reader(data_frequency, path=path) + arrays = None + try: + arrays = reader.load_raw_arrays( + sids=[asset.sid], + fields=['open', 'high', 'low', 'close', 'volume'], + start_dt=start_dt, + end_dt=end_dt + ) + except Exception as e: + log.warn('skipping ctable for {} from {} to {}: {}'.format( + asset.symbol, start_dt, end_dt, e + )) + + periods = bundle.get_calendar_periods_range( + start_dt, end_dt, data_frequency + ) + df = get_df_from_arrays(arrays, periods) + + folder = os.path.join( + tempfile.gettempdir(), 'catalyst', exchange.name, asset.symbol + ) + ensure_directory(folder) + + path = os.path.join(folder, period + '.csv') + + log.info('creating csv file: {}'.format(path)) + print('HEAD\n{}'.format(df.head(10))) + print('TAIL\n{}'.format(df.tail(10))) + df.to_csv(path) + pass