From 93f4d31399bf8e8e28dcdeae684bb07e66407479 Mon Sep 17 00:00:00 2001 From: fredfortier Date: Fri, 13 Oct 2017 16:29:43 -0400 Subject: [PATCH] Unit tested ingestion of bundle chunks. This may not be stable yet. --- .../buy_low_sell_high_neo_with_interface.py | 1 + catalyst/exchange/bundle_utils.py | 41 ++++- catalyst/exchange/exchange_bundle.py | 161 ++++++------------ catalyst/exchange/exchange_utils.py | 8 + catalyst/exchange/init_utils.py | 8 + tests/exchange/test_bundle.py | 27 ++- 6 files changed, 118 insertions(+), 128 deletions(-) diff --git a/catalyst/examples/buy_low_sell_high_neo_with_interface.py b/catalyst/examples/buy_low_sell_high_neo_with_interface.py index 7037afad..e02b50de 100644 --- a/catalyst/examples/buy_low_sell_high_neo_with_interface.py +++ b/catalyst/examples/buy_low_sell_high_neo_with_interface.py @@ -91,6 +91,7 @@ def _handle_data(context, data): elif position.amount > 0 and \ price > cost_basis * (1 + context.PROFIT_TARGET): profit = (price * position.amount) - (cost_basis * position.amount) + log.info('closing position, taking profit: {}'.format(profit)) order_target_percent( asset=context.asset, diff --git a/catalyst/exchange/bundle_utils.py b/catalyst/exchange/bundle_utils.py index 2f87b6c9..92af9b10 100644 --- a/catalyst/exchange/bundle_utils.py +++ b/catalyst/exchange/bundle_utils.py @@ -1,3 +1,6 @@ +import gzip +import tarfile + import requests from datetime import timedelta, datetime import os @@ -8,7 +11,9 @@ import numpy as np import pytz from catalyst.data.bundles import from_bundle_ingest_dirname +from catalyst.data.bundles.core import download_without_progress from catalyst.exchange.exchange_errors import ApiCandlesError +from catalyst.exchange.exchange_utils import get_exchange_bundles_folder from catalyst.utils.deprecate import deprecated from catalyst.utils.paths import data_path @@ -29,21 +34,43 @@ def get_seconds_from_date(date): return int((date - epoch).total_seconds()) -def get_bcolz_chunk(exchange_name, data_frequency, symbol, period_a, period_b): +def get_bcolz_chunk(exchange_name, symbol, data_frequency, period): """ + Download and extract a bcolz bundle. :param exchange_name: - :param data_frequency: :param symbol: - :param period_a: - Example: 2017 - :param period_b: - Example: 10 + :param data_frequency: + :param period: + :return: Note: Filename: bitfinex-daily-neo_eth-2017-10.tar.gz - :return: """ + + root = get_exchange_bundles_folder(exchange_name) + name = '{exchange}-{frequency}-{symbol}-{period}'.format( + exchange=exchange_name, + frequency=data_frequency, + symbol=symbol, + period=period + ) + path = os.path.join(root, name) + + if not os.path.isdir(path): + url = 'https://s3.amazonaws.com/enigmaco/catalyst-bundles/' \ + 'exchange-{exchange}/{name}.tar.gz'.format( + exchange=exchange_name, + name=name + ) + + bytes = download_without_progress(url) + with tarfile.open('r', fileobj=bytes) as tar: + tar.extractall(path) + + return path + + def get_history(exchange_name, data_frequency, symbol, start=None, end=None): """ History API provides OHLCV data for any of the supported exchanges up to yesterday. diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index a843a777..b45f965e 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -1,10 +1,10 @@ +import calendar import os import time -from datetime import timedelta +from datetime import timedelta, datetime, date -import bcolz import pandas as pd -from logbook import Logger, DEBUG, INFO +from logbook import Logger, INFO from catalyst import get_calendar from catalyst.data.minute_bars import BcolzMinuteOverlappingData, \ @@ -12,8 +12,9 @@ from catalyst.data.minute_bars import BcolzMinuteOverlappingData, \ from catalyst.data.us_equity_pricing import BcolzDailyBarWriter, \ BcolzDailyBarReader from catalyst.exchange.bundle_utils import get_ffill_candles, get_start_dt, \ - get_periods, range_in_bundle -from catalyst.exchange.exchange_utils import get_exchange_folder + get_periods, range_in_bundle, get_bcolz_chunk +from catalyst.exchange.exchange_utils import get_exchange_folder, \ + get_exchange_bundles_folder from catalyst.utils.cli import maybe_show_progress from catalyst.utils.deprecate import deprecated from catalyst.utils.paths import ensure_directory @@ -35,6 +36,7 @@ class ExchangeBundle: self.default_ohlc_ratio = 1000000 self._writers = dict() self._readers = dict() + self.calendar = get_calendar('OPEN') def get_assets(self, include_symbols, exclude_symbols): # TODO: filter exclude symbols assets @@ -117,8 +119,6 @@ class ExchangeBundle: if key in self._writers: return self._writers[key] - open_calendar = get_calendar('OPEN') - root = get_exchange_folder(self.exchange.name) output_dir = BUNDLE_NAME_TEMPLATE.format( root=root, @@ -159,7 +159,7 @@ class ExchangeBundle: else: self._writers[key] = BcolzMinuteBarWriter( rootdir=output_dir, - calendar=open_calendar, + calendar=self.calendar, minutes_per_day=self.minutes_per_day, start_session=start_dt, end_session=end_dt, @@ -175,7 +175,7 @@ class ExchangeBundle: end_session = end_dt.floor('1d') self._writers[key] = BcolzDailyBarWriter( filename=output_dir, - calendar=open_calendar, + calendar=self.calendar, start_session=start_dt, end_session=end_session ) @@ -318,18 +318,31 @@ class ExchangeBundle: return data - def ingest_ctable(self, asset, data_frequency, path): + def download_bundle(self, name): + """ + + :param name: + :return: + """ + + def ingest_ctable(self, asset, data_frequency, period, writer): start_time = time.time() + path = get_bcolz_chunk( + exchange_name=self.exchange.name, + symbol=asset.symbol, + data_frequency=data_frequency, + period=period + ) + reader = BcolzMinuteBarReader(path) start = reader.first_trading_day end = reader.last_available_dt - open_calendar = get_calendar('OPEN') - periods = open_calendar.minutes_in_range(start, end) + periods = self.calendar.minutes_in_range(start, end) - sid = 284 + sid = asset.sid arrays = reader.load_raw_arrays( fields=['open', 'high', 'low', 'close', 'volume'], start_dt=start, @@ -352,12 +365,10 @@ class ExchangeBundle: data = [] if not df.empty: - df.sort_index(inplace=True) + df.sort_index(inplace=True, ascending=False) data.append((sid, df)) - writer = self.get_writer(start, end, data_frequency) - self._write(data, writer, data_frequency) end_time = time.time() @@ -371,7 +382,6 @@ class ExchangeBundle: exclude_symbols=None, start=None, end=None, show_progress=True, environ=os.environ): """ - Ingest the bundle :param data_frequency: :param include_symbols: @@ -385,113 +395,54 @@ class ExchangeBundle: assets = self.get_assets(include_symbols, exclude_symbols) start, end = self.get_adj_dates(start, end, assets) - - symbols = list(map(lambda asset: asset.symbol, assets)) - log.info( - 'ingesting trading pairs {symbols} on exchange {exchange} ' - 'from {start} to {end}'.format( - symbols=symbols, - exchange=self.exchange.name, - start=start, - end=end - ) - ) - - writer = self.get_writer(start, end, data_frequency) reader = self.get_reader(data_frequency) - all_chunks = [] + chunks = [] + periods = [] for asset in assets: - try: - asset_start, asset_end = \ - self.get_adj_dates(start, end, [asset]) + asset_start, asset_end = self.get_adj_dates(start, end, [asset]) + sessions = self.calendar.sessions_in_range(asset_start, asset_end) - except ValueError as e: - log.debug('asset outside of range {} {}'.format(asset, e)) - continue + dt = sessions[0] + while dt <= sessions[-1]: + period = '{}-{}'.format(dt.year, dt.month) - asset_periods = get_periods(asset_start, asset_end, data_frequency) - if asset_periods > self.exchange.num_candles_limit: - bar_count = self.exchange.num_candles_limit + if period not in periods: + periods.append(period) - chunks = [] + month_range = calendar.monthrange(dt.year, dt.month) + month_start = date(dt.year, dt.month, month_range[0]) + month_end = date(dt.year, dt.month, month_range[1]) - period_delta = timedelta(minutes=1) \ - if data_frequency == 'minute' else \ - timedelta(days=1) - - chunk_start = asset_start.floor('1 min') - period_delta - while chunk_start < asset_end: - delta = timedelta(minutes=bar_count) \ - if data_frequency == 'minute' else \ - timedelta(days=bar_count) - - chunk_end = chunk_start + delta \ - if chunk_start + delta < asset_end else asset_end - - chunk_periods = \ - get_periods(chunk_start, chunk_end, data_frequency) - - range_start = \ - get_start_dt(chunk_end, chunk_periods, data_frequency) - - if range_in_bundle(asset, range_start, chunk_end, reader): - log.debug( - 'chunk already ingested {symbol} ' - '{start} to {end}'.format( - symbol=asset.symbol, - start=range_start, - end=chunk_end + if not range_in_bundle(asset, month_start, month_end, + reader): + log.debug('adding period: {}'.format(period)) + chunks.append( + dict( + asset=asset, + period_end=month_end, + period=period ) ) - chunk_start = chunk_end + period_delta - continue + dt += timedelta(days=1) - chunk = dict( - asset=asset, - end=chunk_end, - bar_count=chunk_periods - ) - chunks.append(chunk) - - chunk_start = chunk_end + period_delta - - all_chunks += chunks - - else: - if range_in_bundle(asset, asset_start, asset_end, reader): - log.debug( - 'asset already ingested {symbol} ' - '{start} to {end}'.format( - symbol=asset.symbol, - start=asset_start, - end=asset_end - ) - ) - continue - - all_chunks += [ - dict(asset=asset, end=asset_end, bar_count=asset_periods) - ] - - all_chunks.sort(key=lambda chunk: chunk['end']) + chunks.sort(key=lambda chunk: chunk['period_end']) + writer = self.get_writer(start, end, data_frequency) with maybe_show_progress( - all_chunks, + chunks, show_progress, label='Fetching {exchange} {frequency} candles: '.format( exchange=self.exchange.name, frequency=data_frequency )) as it: - previous_candle = dict() for chunk in it: - self.ingest_chunk( - bar_count=chunk['bar_count'], - end_dt=chunk['end'], - data_frequency=data_frequency, + self.ingest_ctable( asset=chunk['asset'], - writer=writer, - previous_candle=previous_candle, + data_frequency=data_frequency, + period=chunk['period'], + writer=writer ) + pass diff --git a/catalyst/exchange/exchange_utils.py b/catalyst/exchange/exchange_utils.py index c6884d73..1e73805a 100644 --- a/catalyst/exchange/exchange_utils.py +++ b/catalyst/exchange/exchange_utils.py @@ -162,6 +162,14 @@ def get_exchange_minute_writer_root(exchange_name, environ=None): return minute_data_folder +def get_exchange_bundles_folder(exchange_name, environ=None): + exchange_folder = get_exchange_folder(exchange_name, environ) + + temp_bundles = os.path.join(exchange_folder, 'temp_bundles') + ensure_directory(temp_bundles) + + return temp_bundles + def perf_serial(obj): """JSON serializer for objects not serializable by default json code""" diff --git a/catalyst/exchange/init_utils.py b/catalyst/exchange/init_utils.py index 446fe1cb..a37f0441 100644 --- a/catalyst/exchange/init_utils.py +++ b/catalyst/exchange/init_utils.py @@ -2,6 +2,7 @@ from catalyst.exchange.bitfinex.bitfinex import Bitfinex from catalyst.exchange.bittrex.bittrex import Bittrex from catalyst.exchange.exchange_errors import ExchangeNotFoundError from catalyst.exchange.exchange_utils import get_exchange_auth +from catalyst.exchange.poloniex.poloniex import Poloniex def get_exchange(exchange_name): @@ -20,5 +21,12 @@ def get_exchange(exchange_name): base_currency=None, portfolio=None ) + elif exchange_name == 'poloniex': + return Poloniex( + key=exchange_auth['key'], + secret=exchange_auth['secret'], + base_currency=None, + portfolio=None + ) else: raise ExchangeNotFoundError(exchange_name=exchange_name) diff --git a/tests/exchange/test_bundle.py b/tests/exchange/test_bundle.py index 78e7fd31..8f4e5a8f 100644 --- a/tests/exchange/test_bundle.py +++ b/tests/exchange/test_bundle.py @@ -71,27 +71,22 @@ class ExchangeBundleTestCase: pass def test_merge_ctables(self): - exchange_name = 'bitfinex' + exchange_name = 'poloniex' + data_frequency = 'minute' - root = '/Users/fredfortier/.catalyst/data/exchanges/bitfinex/temp_bundles' - path = '00/02/000284.bcolz' - - august = '{}/{}'.format( - root, 'poloniex-minute-btc_usdt-2017-8' - ) exchange = get_exchange(exchange_name) - asset = exchange.get_asset('btc_usd') + asset = exchange.get_asset('btc_usdt') + + start = pd.to_datetime('2017-09-01', utc=True) + end = pd.to_datetime('2017-09-06', utc=True) exchange_bundle = ExchangeBundle(exchange) + + writer = exchange_bundle.get_writer(start, end, data_frequency) exchange_bundle.ingest_ctable( asset=asset, - data_frequency='minute', - path=august + data_frequency=data_frequency, + period='2017-9', + writer=writer ) - - september = '{}/{}/{}'.format( - root, 'poloniex-minute-btc_usdt-2017-9', path - ) - zseptember = bcolz.open(september, mode='a') - pass