From 403be971435ac61f7eb192dff1871a23f32db7aa Mon Sep 17 00:00:00 2001 From: fredfortier Date: Sun, 8 Oct 2017 02:27:13 -0400 Subject: [PATCH] Integrating with history api --- catalyst/assets/_assets.pyx | 25 +++++++++++++++++--- catalyst/exchange/bundle_utils.py | 31 ++++++++++++++++--------- catalyst/exchange/exchange.py | 15 +++++++++++- catalyst/exchange/exchange_bundle.py | 34 ++++++++++++++++++++-------- tests/exchange/test_bundle.py | 21 ++++++++++++++++- 5 files changed, 100 insertions(+), 26 deletions(-) diff --git a/catalyst/assets/_assets.pyx b/catalyst/assets/_assets.pyx index b98127f7..986d2174 100644 --- a/catalyst/assets/_assets.pyx +++ b/catalyst/assets/_assets.pyx @@ -395,6 +395,9 @@ cdef class TradingPair(Asset): cdef readonly float leverage cdef readonly object market_currency cdef readonly object base_currency + cdef readonly object end_daily + cdef readonly object end_minute + cdef readonly object exchange_symbol _kwargnames = frozenset({ 'sid', @@ -409,7 +412,10 @@ cdef class TradingPair(Asset): 'leverage', 'market_currency', 'base_currency', - 'min_trade_size', + 'end_daily', + 'end_minute', + 'exchange_symbol', + 'min_trade_size' }) def __init__(self, object symbol, @@ -418,7 +424,10 @@ cdef class TradingPair(Asset): object asset_name=None, int sid=0, float leverage=1.0, + object end_daily=None, + object end_minute=None, object end_date=None, + object exchange_symbol=None, object first_traded=None, object auto_close_date=None, object exchange_full=None, @@ -474,7 +483,10 @@ cdef class TradingPair(Asset): :param asset_name: :param sid: :param leverage: + :param end_daily + :param end_minute :param end_date: + :param exchange_symbol: :param first_traded: :param auto_close_date: :param exchange_full: @@ -516,6 +528,9 @@ cdef class TradingPair(Asset): ) self.leverage = leverage + self.end_daily = end_daily + self.end_minute = end_minute + self.exchange_symbol = exchange_symbol def __repr__(self): return 'Trading Pair {symbol}({sid}) Exchange: {exchange}, ' \ @@ -523,7 +538,9 @@ cdef class TradingPair(Asset): 'Market Currency: {market_currency}, ' \ 'Base Currency: {base_currency}, ' \ 'Exchange Leverage: {leverage}, ' \ - 'Minimum Trade Size: {min_trade_size}'.format( + 'Minimum Trade Size: {min_trade_size} ' \ + 'Last daily ingestion: {end_daily} ' \ + 'Last minutely ingestion: {end_minute}'.format( symbol=self.symbol, sid=self.sid, exchange=self.exchange, @@ -531,7 +548,9 @@ cdef class TradingPair(Asset): market_currency=self.market_currency, base_currency=self.base_currency, leverage=self.leverage, - min_trade_size=self.min_trade_size + min_trade_size=self.min_trade_size, + end_daily=self.end_daily, + end_minute=self.end_minute ) cpdef __reduce__(self): diff --git a/catalyst/exchange/bundle_utils.py b/catalyst/exchange/bundle_utils.py index d863fb94..fe71e025 100644 --- a/catalyst/exchange/bundle_utils.py +++ b/catalyst/exchange/bundle_utils.py @@ -2,6 +2,8 @@ import datetime, requests import os from logging import Logger +import pytz + from catalyst.data.bundles import from_bundle_ingest_dirname from catalyst.utils.paths import data_path @@ -15,8 +17,15 @@ def get_date_from_ms(ms): return datetime.datetime.fromtimestamp(ms / 1000.0) -def get_history(exchange_name, data_frequency, symbol, start_ms=None, - end_ms=None): +def get_seconds_from_date(date): + epoch = datetime.datetime.utcfromtimestamp(0) + epoch = epoch.replace(tzinfo=pytz.UTC) + + return int((date - epoch).total_seconds()) + + +def get_history(exchange_name, data_frequency, symbol, start_seconds=None, + end_seconds=None): """ History API provides OHLCV data for any of the supported exchanges up to yesterday. @@ -27,10 +36,10 @@ def get_history(exchange_name, data_frequency, symbol, start_ms=None, *** currently only 'daily' is supported *** :param symbol: string Required: The trading pair symbol. - :param start: float - Optional: The start date in milliseconds. - :param end: float - Optional: The end date in milliseconds. + :param start_seconds: int + Optional: The start date in seconds. + :param end_seconds: int + Optional: The end date in seconds. :return ohlcv: list[dict[string, float]] Each row contains the following dictionary for the resulting bars: @@ -67,11 +76,11 @@ def get_history(exchange_name, data_frequency, symbol, start_ms=None, data_frequency=data_frequency, ) - if start_ms: - url += '&start={}'.format(int(start_ms / 1000)) + if start_seconds: + url += '&start={}'.format(start_seconds) - if end_ms: - url += '&end={}'.format(int(end_ms / 1000)) + if end_seconds: + url += '&end={}'.format(end_seconds) try: response = requests.get(url) @@ -80,7 +89,7 @@ def get_history(exchange_name, data_frequency, symbol, start_ms=None, data = response.json() - if 'error' in response: + if 'error' in data: raise ValueError(response['error']) return data diff --git a/catalyst/exchange/exchange.py b/catalyst/exchange/exchange.py index ef06c438..471215d1 100644 --- a/catalyst/exchange/exchange.py +++ b/catalyst/exchange/exchange.py @@ -226,6 +226,16 @@ class Exchange: else: min_trade_size = 0.0000001 + if 'end_daily' in asset and asset['end_daily'] != 'N/A': + end_daily = pd.to_datetime(asset['end_daily'], utc=True) + else: + end_daily = None + + if 'end_minute' in asset and asset['end_minute'] != 'N/A': + end_minute = pd.to_datetime(asset['end_minute'], utc=True) + else: + end_minute = None + trading_pair = TradingPair( symbol=asset['symbol'], exchange=self.name, @@ -233,7 +243,10 @@ class Exchange: end_date=end_date, leverage=leverage, asset_name=asset_name, - min_trade_size=min_trade_size + min_trade_size=min_trade_size, + end_daily=end_daily, + end_minute=end_minute, + exchange_symbol=exchange_symbol ) self.assets[exchange_symbol] = trading_pair diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index a63aca45..c5404499 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -10,7 +10,8 @@ from catalyst.data.minute_bars import BcolzMinuteOverlappingData, \ BcolzMinuteBarWriter, BcolzMinuteBarReader from catalyst.data.us_equity_pricing import BcolzDailyBarWriter, \ BcolzDailyBarReader -from catalyst.exchange.bundle_utils import fetch_candles_chunk +from catalyst.exchange.bundle_utils import fetch_candles_chunk, get_history, \ + get_seconds_from_date from catalyst.exchange.exchange_utils import get_exchange_folder from catalyst.exchange.init_utils import get_exchange from catalyst.utils.cli import maybe_show_progress @@ -136,11 +137,12 @@ class ExchangeBundle: if len(os.listdir(output_dir)) > 0: self._writers[key] = BcolzDailyBarWriter.open(output_dir, end) else: + end_session = end.floor('1d') self._writers[key] = BcolzDailyBarWriter( filename=output_dir, calendar=open_calendar, start_session=start, - end_session=end + end_session=end_session ) else: raise ValueError( @@ -213,14 +215,26 @@ class ExchangeBundle: log.debug('the data chunk already exists') return - # TODO: ensure correct behavior for assets starting in the chunk - candles = fetch_candles_chunk( - exchange=self.exchange, - assets=missing_assets, - data_frequency=data_frequency, - end_dt=chunk_end, - bar_count=chunk['bar_count'] - ) + if data_frequency == 'minute': + # TODO: ensure correct behavior for assets starting in the chunk + candles = fetch_candles_chunk( + exchange=self.exchange, + assets=missing_assets, + data_frequency=data_frequency, + end_dt=chunk_end, + bar_count=chunk['bar_count'] + ) + else: + for asset in missing_assets: + # TODO: switch to Catalyst symbol convention + candles = get_history( + exchange_name=self.exchange.name, + data_frequency=data_frequency, + symbol=asset.exchange_symbol, + start_seconds=get_seconds_from_date(chunk_start), + end_seconds=get_seconds_from_date(chunk_end) + ) + pass num_candles = 0 data = [] diff --git a/tests/exchange/test_bundle.py b/tests/exchange/test_bundle.py index 76696c3e..f0199425 100644 --- a/tests/exchange/test_bundle.py +++ b/tests/exchange/test_bundle.py @@ -8,7 +8,7 @@ log = Logger('test_exchange_bundle') class ExchangeBundleTestCase: - def test_ingest(self): + def test_ingest_minute(self): exchange_name = 'bitfinex' start = pd.to_datetime('2017-09-01', utc=True) @@ -26,3 +26,22 @@ class ExchangeBundleTestCase: show_progress=True ) pass + + def test_ingest_daily(self): + exchange_name = 'bitfinex' + + start = pd.to_datetime('2017-09-01', utc=True) + end = pd.Timestamp.utcnow() + + exchange_bundle = ExchangeBundle(exchange_name) + + log.info('ingesting exchange bundle {}'.format(exchange_name)) + exchange_bundle.ingest( + data_frequency='daily', + include_symbols='neo_btc', + exclude_symbols=None, + start=start, + end=end, + show_progress=True + ) + pass