From 4f80ebee57f5671a2212217f158ea4b2aca9806b Mon Sep 17 00:00:00 2001 From: fredfortier Date: Mon, 9 Oct 2017 14:50:51 -0400 Subject: [PATCH] Naive integration with the consolidated exchanges api --- catalyst/exchange/bundle_utils.py | 27 +++++++++++++------ catalyst/exchange/exchange_bundle.py | 40 +++++++++++++++++----------- 2 files changed, 44 insertions(+), 23 deletions(-) diff --git a/catalyst/exchange/bundle_utils.py b/catalyst/exchange/bundle_utils.py index fe71e025..57a2d6ea 100644 --- a/catalyst/exchange/bundle_utils.py +++ b/catalyst/exchange/bundle_utils.py @@ -1,6 +1,7 @@ import datetime, requests import os from logging import Logger +import pandas as pd import pytz @@ -24,8 +25,7 @@ def get_seconds_from_date(date): return int((date - epoch).total_seconds()) -def get_history(exchange_name, data_frequency, symbol, start_seconds=None, - end_seconds=None): +def get_history(exchange_name, data_frequency, symbol, start=None, end=None): """ History API provides OHLCV data for any of the supported exchanges up to yesterday. @@ -36,10 +36,10 @@ def get_history(exchange_name, data_frequency, symbol, start_seconds=None, *** currently only 'daily' is supported *** :param symbol: string Required: The trading pair symbol. - :param start_seconds: int - Optional: The start date in seconds. - :param end_seconds: int - Optional: The end date in seconds. + :param start: datetime + Optional: The start date. + :param end: datetime + Optional: The end date. :return ohlcv: list[dict[string, float]] Each row contains the following dictionary for the resulting bars: @@ -61,13 +61,18 @@ def get_history(exchange_name, data_frequency, symbol, start_seconds=None, forward fill missing bars outside of this function. """ + start_seconds = get_seconds_from_date(start) if start else None + end_seconds = get_seconds_from_date(end) if end else None + if exchange_name not in EXCHANGE_NAMES: raise ValueError( 'get_history function only supports the following exchanges: {}'.format( list(EXCHANGE_NAMES))) - if data_frequency != 'daily': - raise ValueError('get_history currently only supports daily data.') + if data_frequency != 'daily' and data_frequency != 'minute': + raise ValueError( + 'get_history currently only supports daily and minute data.' + ) url = '{api_url}/candles?exchange={exchange}&market={symbol}&freq={data_frequency}'.format( api_url=API_URL, @@ -92,6 +97,12 @@ def get_history(exchange_name, data_frequency, symbol, start_seconds=None, if 'error' in data: raise ValueError(response['error']) + for candle in data: + last_traded = pd.Timestamp.utcfromtimestamp(candle['ts']) + last_traded = last_traded.replace(tzinfo=pytz.UTC) + + candle['last_traded'] = last_traded + return data diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index c5404499..0b4d8fdd 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -215,26 +215,36 @@ class ExchangeBundle: log.debug('the data chunk already exists') return - if data_frequency == 'minute': - # TODO: ensure correct behavior for assets starting in the chunk - candles = fetch_candles_chunk( - exchange=self.exchange, - assets=missing_assets, - data_frequency=data_frequency, - end_dt=chunk_end, - bar_count=chunk['bar_count'] - ) - else: - for asset in missing_assets: + candles = dict() + for asset in missing_assets: + if chunk_start < asset.end_minute: + # TODO: fetch delta candles from exchanges + history_end = chunk_end \ + if chunk_end <= asset.end_minute else asset.end_minute + # TODO: switch to Catalyst symbol convention - candles = get_history( + candles[asset] = get_history( exchange_name=self.exchange.name, data_frequency=data_frequency, symbol=asset.exchange_symbol, - start_seconds=get_seconds_from_date(chunk_start), - end_seconds=get_seconds_from_date(chunk_end) + start=chunk_start, + end=history_end ) - pass + else: + log.debug( + 'no data in Catalyst api for chunk ' + '{} to {}'.format(chunk_start, chunk_end) + ) + # if data_frequency == 'minute': + # # TODO: ensure correct behavior for assets starting in the chunk + # candles = fetch_candles_chunk( + # exchange=self.exchange, + # assets=missing_assets, + # data_frequency=data_frequency, + # end_dt=chunk_end, + # bar_count=chunk['bar_count'] + # ) + # else: num_candles = 0 data = []