From 74fd4a6a0f502e9e2be0112f3294b2da87bd9cf1 Mon Sep 17 00:00:00 2001 From: fredfortier Date: Wed, 18 Oct 2017 00:04:55 -0400 Subject: [PATCH] Trying to fix an issue with merging new candles in get_history() --- catalyst/exchange/bitfinex/bitfinex.py | 3 + catalyst/exchange/bittrex/bittrex.py | 3 + catalyst/exchange/bundle_utils.py | 14 ++- catalyst/exchange/exchange.py | 121 ++++++++++--------------- catalyst/exchange/exchange_bundle.py | 64 ++++++++----- catalyst/exchange/exchange_errors.py | 9 +- catalyst/exchange/poloniex/poloniex.py | 3 + catalyst/utils/run_algo.py | 3 + 8 files changed, 117 insertions(+), 103 deletions(-) diff --git a/catalyst/exchange/bitfinex/bitfinex.py b/catalyst/exchange/bitfinex/bitfinex.py index d866e912..1817ea2d 100644 --- a/catalyst/exchange/bitfinex/bitfinex.py +++ b/catalyst/exchange/bitfinex/bitfinex.py @@ -15,6 +15,7 @@ from catalyst.assets._assets import TradingPair from logbook import Logger from catalyst.exchange.exchange import Exchange +from catalyst.exchange.exchange_bundle import ExchangeBundle from catalyst.exchange.exchange_errors import ( ExchangeRequestError, InvalidHistoryFrequencyError, @@ -58,6 +59,8 @@ class Bitfinex(Exchange): self.max_requests_per_minute = 20 self.request_cpt = dict() + self.bundle = ExchangeBundle(self) + def _request(self, operation, data, version='v1'): payload_object = { 'request': '/{}/{}'.format(version, operation), diff --git a/catalyst/exchange/bittrex/bittrex.py b/catalyst/exchange/bittrex/bittrex.py index 2979a71c..dcc9b943 100644 --- a/catalyst/exchange/bittrex/bittrex.py +++ b/catalyst/exchange/bittrex/bittrex.py @@ -7,6 +7,7 @@ from six.moves import urllib from catalyst.exchange.bittrex.bittrex_api import Bittrex_api from catalyst.exchange.exchange import Exchange +from catalyst.exchange.exchange_bundle import ExchangeBundle from catalyst.exchange.exchange_errors import InvalidHistoryFrequencyError, \ ExchangeRequestError, InvalidOrderStyle, OrderNotFound, OrderCancelError, \ CreateOrderError @@ -41,6 +42,8 @@ class Bittrex(Exchange): self.assets = dict() self.load_assets() + self.bundle = ExchangeBundle(self) + @property def account(self): pass diff --git a/catalyst/exchange/bundle_utils.py b/catalyst/exchange/bundle_utils.py index e5f970d8..ebf39d76 100644 --- a/catalyst/exchange/bundle_utils.py +++ b/catalyst/exchange/bundle_utils.py @@ -1,11 +1,9 @@ import calendar import tarfile -import shutil import requests from datetime import timedelta, datetime, date import os -from logging import Logger import pandas as pd import numpy as np @@ -19,7 +17,6 @@ from catalyst.exchange.exchange_utils import get_exchange_bundles_folder from catalyst.utils.deprecate import deprecated from catalyst.utils.paths import data_path -log = Logger('test_exchange_bundle') EXCHANGE_NAMES = ['bitfinex', 'bittrex', 'poloniex'] API_URL = 'http://data.enigma.co/api/v1' @@ -78,6 +75,12 @@ def get_delta(periods, data_frequency): if data_frequency == 'minute' else timedelta(days=periods) +def get_periods_range(start_dt, end_dt, data_frequency): + freq = 'T' if data_frequency == 'minute' else 'D' + + return pd.date_range(start_dt, end_dt, freq=freq) + + def get_periods(start_dt, end_dt, data_frequency): delta = end_dt - start_dt @@ -127,14 +130,9 @@ def get_adj_dates(start, end, assets, data_frequency): last_entry = end_asset if start is None or earliest_trade > start: - log.debug( - 'adjusting start date to earliest trade date found {}'.format( - earliest_trade - )) start = earliest_trade if end is None or (last_entry is not None and end > last_entry): - log.debug('adjusting the end date to now {}'.format(last_entry)) end = last_entry if start >= end: diff --git a/catalyst/exchange/exchange.py b/catalyst/exchange/exchange.py index 93cc128b..34093111 100644 --- a/catalyst/exchange/exchange.py +++ b/catalyst/exchange/exchange.py @@ -11,20 +11,18 @@ from catalyst.assets._assets import TradingPair from logbook import Logger from catalyst.data.data_portal import BASE_FIELDS -from catalyst.exchange import bundle_utils from catalyst.exchange.bundle_utils import get_start_dt, \ get_delta, get_trailing_candles_dt, get_periods, get_adj_dates from catalyst.exchange.exchange_bundle import ExchangeBundle from catalyst.exchange.exchange_errors import MismatchingBaseCurrencies, \ InvalidOrderStyle, BaseCurrencyNotFoundError, SymbolNotFoundOnExchange, \ - InvalidHistoryFrequencyError + InvalidHistoryFrequencyError, MismatchingFrequencyError from catalyst.exchange.exchange_execution import ExchangeStopLimitOrder, \ ExchangeLimitOrder, ExchangeStopOrder from catalyst.exchange.exchange_portfolio import ExchangePortfolio from catalyst.exchange.exchange_utils import get_exchange_symbols from catalyst.finance.order import ORDER_STATUS from catalyst.finance.transaction import Transaction -from catalyst.utils.deprecate import deprecated log = Logger('Exchange') @@ -43,6 +41,7 @@ class Exchange: self.num_candles_limit = None self.max_requests_per_minute = None self.request_cpt = None + self.bundle = ExchangeBundle(self) @property def positions(self): @@ -367,55 +366,32 @@ class Exchange: ) ) - if field == 'price': - field = 'close' - # Don't use a timezone here dt = pd.Timestamp.utcnow().floor('1 min') - value = None - if self.minute_reader is not None: + ohlc = self.get_candles(data_frequency, asset) + if field not in ohlc: + raise KeyError('Invalid column: %s' % field) + + if self.minute_writer is not None: + df = pd.DataFrame( + [ohlc], + index=pd.DatetimeIndex([dt]), + columns=['open', 'high', 'low', 'close', 'volume'] + ) + try: - # Slight delay to minimize the chances that multiple algos - # might try to hit the cache at the exact same time. - sleep_time = random.uniform(0.5, 0.8) - sleep(sleep_time) - # TODO: This does not always! Why is that? Open an issue with zipline. - # See: https://github.com/zipline-live/zipline/issues/26 - value = self.minute_reader.get_value( + # TODO: use victor's modified branch using int64 + self.minute_writer.write_sid( sid=asset.sid, - dt=dt, - field=field + df=df ) + log.debug('wrote minute data: {}'.format(dt)) except Exception as e: - log.warn('minute data not found: {}'.format(e)) - - if value is None or np.isnan(value): - ohlc = self.get_candles(data_frequency, asset) - if field not in ohlc: - raise KeyError('Invalid column: %s' % field) - - if self.minute_writer is not None: - df = pd.DataFrame( - [ohlc], - index=pd.DatetimeIndex([dt]), - columns=['open', 'high', 'low', 'close', 'volume'] - ) - - try: - # TODO: use victor's modified branch using int64 - self.minute_writer.write_sid( - sid=asset.sid, - df=df - ) - log.debug('wrote minute data: {}'.format(dt)) - except Exception as e: - log.warn( - 'unable to write minute data: {} {}'.format(dt, e)) + log.warn( + 'unable to write minute data: {} {}'.format(dt, e)) value = ohlc[field] log.debug('got spot value: {}'.format(value)) - else: - log.debug('got spot value from cache: {}'.format(value)) return value @@ -462,8 +438,6 @@ class Exchange: A dataframe containing the requested data. """ - bundle = ExchangeBundle(self) - freq_match = re.match(r'([0-9].*)(m|M|d|D)', frequency, re.M | re.I) if freq_match: candle_size = int(freq_match.group(1)) @@ -474,11 +448,17 @@ class Exchange: if unit.lower() == 'd': if data_frequency != 'daily': - raise InvalidHistoryFrequencyError(frequency=frequency) + raise MismatchingFrequencyError( + frequency=frequency, + data_frequency=data_frequency + ) elif unit.lower() == 'm': if data_frequency != 'minute': - raise InvalidHistoryFrequencyError(frequency=frequency) + raise MismatchingFrequencyError( + frequency=frequency, + data_frequency=data_frequency + ) else: raise InvalidHistoryFrequencyError(frequency) @@ -489,36 +469,30 @@ class Exchange: start_dt, end_dt = get_adj_dates(start_dt, end_dt, assets, data_frequency) - missing_assets = bundle.filter_existing_assets( + missing_assets = self.bundle.filter_existing_assets( assets=assets, start_dt=start_dt, end_dt=end_dt, data_frequency=data_frequency ) - if len(missing_assets) > 0: - writer = bundle.get_writer(start_dt, end_dt, data_frequency) - - chunks = bundle.prepare_chunks( + if missing_assets: + self.bundle.ingest_assets( assets=assets, - data_frequency=data_frequency, start_dt=start_dt, - end_dt=end_dt + end_dt=end_dt, + data_frequency=data_frequency ) - for chunk in chunks: - log.debug('ingesting chunk for pair {}, period {}'.format( - chunk['asset'], - chunk['period'] - )) - bundle.ingest_ctable( - asset=chunk['asset'], - data_frequency=data_frequency, - period=chunk['period'], - start_dt=chunk['period_start'], - end_dt=chunk['period_end'], - writer=writer - ) + # We check again for data which may be too recent for the consolidated + # exchanges service + missing_assets = self.bundle.filter_existing_assets( + assets=assets, + start_dt=start_dt, + end_dt=end_dt, + data_frequency=data_frequency + ) + if missing_assets: # Adding bars too recent to be contained in the consolidated # exchanges bundles. We go directly against the exchange # to retrieve the candles. @@ -542,21 +516,22 @@ class Exchange: end_dt=end_dt ) - bundle.ingest_candles( + # TODO: Do I need the previous_candle? + self.bundle.ingest_candles( candles=candles, bar_count=trailing_bar_count, + start_dt=start_dt, end_dt=end_dt, - data_frequency=data_frequency, - writer=writer + data_frequency=data_frequency ) - values = bundle.get_raw_arrays( + values = self.bundle.get_raw_arrays( assets=assets, fields=[field], start_dt=start_dt, end_dt=end_dt, data_frequency=data_frequency - )[0] + ) series = dict() for asset_index, asset in enumerate(assets): @@ -565,7 +540,7 @@ class Exchange: # TODO: use numpy to avoid the loop date = start_dt - for value in values: + for value in values[0]: all_dates.append(date) asset_values.append(value[asset_index]) diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index dbc42c0b..6198cdc3 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -14,7 +14,7 @@ from catalyst.data.us_equity_pricing import BcolzDailyBarWriter, \ BcolzDailyBarReader from catalyst.exchange.bundle_utils import get_ffill_candles, range_in_bundle, \ get_bcolz_chunk, get_delta, get_adj_dates, get_month_start_end, \ - get_year_start_end + get_year_start_end, get_periods, get_periods_range from catalyst.exchange.exchange_errors import EmptyValuesInBundleError, \ InvalidHistoryFrequencyError, PricingDataBeforeTradingError from catalyst.exchange.exchange_utils import get_exchange_folder @@ -207,7 +207,7 @@ class ExchangeBundle: # This is workaround, there is an issue with empty # session_label when using a newly created writer - del self._writers[data_frequency] + del self._writers[writer._rootdir] writer = self.get_writer(writer._start_session, writer._end_session, data_frequency) @@ -217,8 +217,9 @@ class ExchangeBundle: invalid_data_behavior='raise' ) - def ingest_candles(self, candles, bar_count, end_dt, data_frequency, - writer, previous_candle=dict()): + def ingest_candles(self, candles, bar_count, start_dt, end_dt, + data_frequency, + previous_candle=dict()): """ Ingest candles obtained via the get_candles API of an exchange. @@ -235,6 +236,8 @@ class ExchangeBundle: :return: """ + writer = self.get_writer(start_dt, end_dt, data_frequency) + num_candles = 0 data = [] for asset in candles: @@ -291,6 +294,9 @@ class ExchangeBundle: path=None): reader = self.get_reader(data_frequency, path) + if reader.last_available_dt < end_dt: + return [] + if data_frequency == 'minute': values = reader.load_raw_arrays( fields=fields, @@ -356,6 +362,9 @@ class ExchangeBundle: path=path ) + if not arrays: + return path + ohlcv = dict( open=arrays[0].flatten(), high=arrays[1].flatten(), @@ -446,7 +455,7 @@ class ExchangeBundle: except PricingDataBeforeTradingError: continue - sessions = self.calendar.sessions_in_range(asset_start, asset_end) + sessions = get_periods_range(asset_start, asset_end, 'daily') periods = [] dt = sessions[0] @@ -510,29 +519,22 @@ class ExchangeBundle: return chunks - def ingest(self, data_frequency, include_symbols=None, - exclude_symbols=None, start=None, end=None, - show_progress=True, environ=os.environ): + def ingest_assets(self, assets, start_dt, end_dt, data_frequency, + show_progress=False): """ + Determine if data is missing from the bundle and attempt to ingest it. - :param data_frequency: - :param include_symbols: - :param exclude_symbols: - :param start: - :param end: - :param show_progress: - :param environ: + :param assets: + :param start_dt: + :param end_dt: :return: """ - assets = self.get_assets(include_symbols, exclude_symbols) - start, end = get_adj_dates(start, end, assets, data_frequency) - - writer = self.get_writer(start, end, data_frequency) + writer = self.get_writer(start_dt, end_dt, data_frequency) chunks = self.prepare_chunks( assets=assets, data_frequency=data_frequency, - start_dt=start, - end_dt=end + start_dt=start_dt, + end_dt=end_dt ) with maybe_show_progress( chunks, @@ -551,3 +553,23 @@ class ExchangeBundle: writer=writer, empty_rows_behavior='strip' ) + + def ingest(self, data_frequency, include_symbols=None, + exclude_symbols=None, start=None, end=None, + show_progress=True, environ=os.environ): + """ + + :param data_frequency: + :param include_symbols: + :param exclude_symbols: + :param start: + :param end: + :param show_progress: + :param environ: + :return: + """ + assets = self.get_assets(include_symbols, exclude_symbols) + start_dt, end_dt = get_adj_dates(start, end, assets, data_frequency) + + self.ingest_assets(assets, start_dt, end_dt, data_frequency, + show_progress) diff --git a/catalyst/exchange/exchange_errors.py b/catalyst/exchange/exchange_errors.py index b6c51155..3a3b6747 100644 --- a/catalyst/exchange/exchange_errors.py +++ b/catalyst/exchange/exchange_errors.py @@ -78,7 +78,14 @@ class AlgoPickleNotFound(ZiplineError): class InvalidHistoryFrequencyError(ZiplineError): msg = ( - 'History frequency {frequency} not supported by the exchange.' + 'Frequency {frequency} not supported by the exchange.' + ).strip() + + +class MismatchingFrequencyError(ZiplineError): + msg = ( + 'Bar aggregate frequency {frequency} not compatible with ' + 'data frequency {data_frequency}.' ).strip() diff --git a/catalyst/exchange/poloniex/poloniex.py b/catalyst/exchange/poloniex/poloniex.py index ca13e951..e24acc75 100644 --- a/catalyst/exchange/poloniex/poloniex.py +++ b/catalyst/exchange/poloniex/poloniex.py @@ -15,6 +15,7 @@ from six import iteritems from catalyst.assets._assets import TradingPair from logbook import Logger +from catalyst.exchange.exchange_bundle import ExchangeBundle from catalyst.exchange.poloniex.poloniex_api import Poloniex_api # from websocket import create_connection @@ -51,6 +52,8 @@ class Poloniex(Exchange): self.max_requests_per_minute = 20 self.request_cpt = dict() + self.bundle = ExchangeBundle(self) + def sanitize_curency_symbol(self, exchange_symbol): """ Helper method used to build the universal pair. diff --git a/catalyst/utils/run_algo.py b/catalyst/utils/run_algo.py index 1cb2bd66..bf149642 100644 --- a/catalyst/utils/run_algo.py +++ b/catalyst/utils/run_algo.py @@ -262,6 +262,9 @@ def _run(handle_data, data_frequency='minute' ) + # TODO: use the constructor instead + sim_params._arena = 'live' + algorithm_class = partial( ExchangeTradingAlgorithmLive, exchanges=exchanges,