Trying to fix an issue with merging new candles in get_history()

This commit is contained in:
fredfortier
2017-10-18 00:04:55 -04:00
parent 1a4dfe8abb
commit 74fd4a6a0f
8 changed files with 117 additions and 103 deletions
+3
View File
@@ -15,6 +15,7 @@ from catalyst.assets._assets import TradingPair
from logbook import Logger
from catalyst.exchange.exchange import Exchange
from catalyst.exchange.exchange_bundle import ExchangeBundle
from catalyst.exchange.exchange_errors import (
ExchangeRequestError,
InvalidHistoryFrequencyError,
@@ -58,6 +59,8 @@ class Bitfinex(Exchange):
self.max_requests_per_minute = 20
self.request_cpt = dict()
self.bundle = ExchangeBundle(self)
def _request(self, operation, data, version='v1'):
payload_object = {
'request': '/{}/{}'.format(version, operation),
+3
View File
@@ -7,6 +7,7 @@ from six.moves import urllib
from catalyst.exchange.bittrex.bittrex_api import Bittrex_api
from catalyst.exchange.exchange import Exchange
from catalyst.exchange.exchange_bundle import ExchangeBundle
from catalyst.exchange.exchange_errors import InvalidHistoryFrequencyError, \
ExchangeRequestError, InvalidOrderStyle, OrderNotFound, OrderCancelError, \
CreateOrderError
@@ -41,6 +42,8 @@ class Bittrex(Exchange):
self.assets = dict()
self.load_assets()
self.bundle = ExchangeBundle(self)
@property
def account(self):
pass
+6 -8
View File
@@ -1,11 +1,9 @@
import calendar
import tarfile
import shutil
import requests
from datetime import timedelta, datetime, date
import os
from logging import Logger
import pandas as pd
import numpy as np
@@ -19,7 +17,6 @@ from catalyst.exchange.exchange_utils import get_exchange_bundles_folder
from catalyst.utils.deprecate import deprecated
from catalyst.utils.paths import data_path
log = Logger('test_exchange_bundle')
EXCHANGE_NAMES = ['bitfinex', 'bittrex', 'poloniex']
API_URL = 'http://data.enigma.co/api/v1'
@@ -78,6 +75,12 @@ def get_delta(periods, data_frequency):
if data_frequency == 'minute' else timedelta(days=periods)
def get_periods_range(start_dt, end_dt, data_frequency):
freq = 'T' if data_frequency == 'minute' else 'D'
return pd.date_range(start_dt, end_dt, freq=freq)
def get_periods(start_dt, end_dt, data_frequency):
delta = end_dt - start_dt
@@ -127,14 +130,9 @@ def get_adj_dates(start, end, assets, data_frequency):
last_entry = end_asset
if start is None or earliest_trade > start:
log.debug(
'adjusting start date to earliest trade date found {}'.format(
earliest_trade
))
start = earliest_trade
if end is None or (last_entry is not None and end > last_entry):
log.debug('adjusting the end date to now {}'.format(last_entry))
end = last_entry
if start >= end:
+48 -73
View File
@@ -11,20 +11,18 @@ from catalyst.assets._assets import TradingPair
from logbook import Logger
from catalyst.data.data_portal import BASE_FIELDS
from catalyst.exchange import bundle_utils
from catalyst.exchange.bundle_utils import get_start_dt, \
get_delta, get_trailing_candles_dt, get_periods, get_adj_dates
from catalyst.exchange.exchange_bundle import ExchangeBundle
from catalyst.exchange.exchange_errors import MismatchingBaseCurrencies, \
InvalidOrderStyle, BaseCurrencyNotFoundError, SymbolNotFoundOnExchange, \
InvalidHistoryFrequencyError
InvalidHistoryFrequencyError, MismatchingFrequencyError
from catalyst.exchange.exchange_execution import ExchangeStopLimitOrder, \
ExchangeLimitOrder, ExchangeStopOrder
from catalyst.exchange.exchange_portfolio import ExchangePortfolio
from catalyst.exchange.exchange_utils import get_exchange_symbols
from catalyst.finance.order import ORDER_STATUS
from catalyst.finance.transaction import Transaction
from catalyst.utils.deprecate import deprecated
log = Logger('Exchange')
@@ -43,6 +41,7 @@ class Exchange:
self.num_candles_limit = None
self.max_requests_per_minute = None
self.request_cpt = None
self.bundle = ExchangeBundle(self)
@property
def positions(self):
@@ -367,55 +366,32 @@ class Exchange:
)
)
if field == 'price':
field = 'close'
# Don't use a timezone here
dt = pd.Timestamp.utcnow().floor('1 min')
value = None
if self.minute_reader is not None:
ohlc = self.get_candles(data_frequency, asset)
if field not in ohlc:
raise KeyError('Invalid column: %s' % field)
if self.minute_writer is not None:
df = pd.DataFrame(
[ohlc],
index=pd.DatetimeIndex([dt]),
columns=['open', 'high', 'low', 'close', 'volume']
)
try:
# Slight delay to minimize the chances that multiple algos
# might try to hit the cache at the exact same time.
sleep_time = random.uniform(0.5, 0.8)
sleep(sleep_time)
# TODO: This does not always! Why is that? Open an issue with zipline.
# See: https://github.com/zipline-live/zipline/issues/26
value = self.minute_reader.get_value(
# TODO: use victor's modified branch using int64
self.minute_writer.write_sid(
sid=asset.sid,
dt=dt,
field=field
df=df
)
log.debug('wrote minute data: {}'.format(dt))
except Exception as e:
log.warn('minute data not found: {}'.format(e))
if value is None or np.isnan(value):
ohlc = self.get_candles(data_frequency, asset)
if field not in ohlc:
raise KeyError('Invalid column: %s' % field)
if self.minute_writer is not None:
df = pd.DataFrame(
[ohlc],
index=pd.DatetimeIndex([dt]),
columns=['open', 'high', 'low', 'close', 'volume']
)
try:
# TODO: use victor's modified branch using int64
self.minute_writer.write_sid(
sid=asset.sid,
df=df
)
log.debug('wrote minute data: {}'.format(dt))
except Exception as e:
log.warn(
'unable to write minute data: {} {}'.format(dt, e))
log.warn(
'unable to write minute data: {} {}'.format(dt, e))
value = ohlc[field]
log.debug('got spot value: {}'.format(value))
else:
log.debug('got spot value from cache: {}'.format(value))
return value
@@ -462,8 +438,6 @@ class Exchange:
A dataframe containing the requested data.
"""
bundle = ExchangeBundle(self)
freq_match = re.match(r'([0-9].*)(m|M|d|D)', frequency, re.M | re.I)
if freq_match:
candle_size = int(freq_match.group(1))
@@ -474,11 +448,17 @@ class Exchange:
if unit.lower() == 'd':
if data_frequency != 'daily':
raise InvalidHistoryFrequencyError(frequency=frequency)
raise MismatchingFrequencyError(
frequency=frequency,
data_frequency=data_frequency
)
elif unit.lower() == 'm':
if data_frequency != 'minute':
raise InvalidHistoryFrequencyError(frequency=frequency)
raise MismatchingFrequencyError(
frequency=frequency,
data_frequency=data_frequency
)
else:
raise InvalidHistoryFrequencyError(frequency)
@@ -489,36 +469,30 @@ class Exchange:
start_dt, end_dt = get_adj_dates(start_dt, end_dt, assets,
data_frequency)
missing_assets = bundle.filter_existing_assets(
missing_assets = self.bundle.filter_existing_assets(
assets=assets,
start_dt=start_dt,
end_dt=end_dt,
data_frequency=data_frequency
)
if len(missing_assets) > 0:
writer = bundle.get_writer(start_dt, end_dt, data_frequency)
chunks = bundle.prepare_chunks(
if missing_assets:
self.bundle.ingest_assets(
assets=assets,
data_frequency=data_frequency,
start_dt=start_dt,
end_dt=end_dt
end_dt=end_dt,
data_frequency=data_frequency
)
for chunk in chunks:
log.debug('ingesting chunk for pair {}, period {}'.format(
chunk['asset'],
chunk['period']
))
bundle.ingest_ctable(
asset=chunk['asset'],
data_frequency=data_frequency,
period=chunk['period'],
start_dt=chunk['period_start'],
end_dt=chunk['period_end'],
writer=writer
)
# We check again for data which may be too recent for the consolidated
# exchanges service
missing_assets = self.bundle.filter_existing_assets(
assets=assets,
start_dt=start_dt,
end_dt=end_dt,
data_frequency=data_frequency
)
if missing_assets:
# Adding bars too recent to be contained in the consolidated
# exchanges bundles. We go directly against the exchange
# to retrieve the candles.
@@ -542,21 +516,22 @@ class Exchange:
end_dt=end_dt
)
bundle.ingest_candles(
# TODO: Do I need the previous_candle?
self.bundle.ingest_candles(
candles=candles,
bar_count=trailing_bar_count,
start_dt=start_dt,
end_dt=end_dt,
data_frequency=data_frequency,
writer=writer
data_frequency=data_frequency
)
values = bundle.get_raw_arrays(
values = self.bundle.get_raw_arrays(
assets=assets,
fields=[field],
start_dt=start_dt,
end_dt=end_dt,
data_frequency=data_frequency
)[0]
)
series = dict()
for asset_index, asset in enumerate(assets):
@@ -565,7 +540,7 @@ class Exchange:
# TODO: use numpy to avoid the loop
date = start_dt
for value in values:
for value in values[0]:
all_dates.append(date)
asset_values.append(value[asset_index])
+43 -21
View File
@@ -14,7 +14,7 @@ from catalyst.data.us_equity_pricing import BcolzDailyBarWriter, \
BcolzDailyBarReader
from catalyst.exchange.bundle_utils import get_ffill_candles, range_in_bundle, \
get_bcolz_chunk, get_delta, get_adj_dates, get_month_start_end, \
get_year_start_end
get_year_start_end, get_periods, get_periods_range
from catalyst.exchange.exchange_errors import EmptyValuesInBundleError, \
InvalidHistoryFrequencyError, PricingDataBeforeTradingError
from catalyst.exchange.exchange_utils import get_exchange_folder
@@ -207,7 +207,7 @@ class ExchangeBundle:
# This is workaround, there is an issue with empty
# session_label when using a newly created writer
del self._writers[data_frequency]
del self._writers[writer._rootdir]
writer = self.get_writer(writer._start_session,
writer._end_session, data_frequency)
@@ -217,8 +217,9 @@ class ExchangeBundle:
invalid_data_behavior='raise'
)
def ingest_candles(self, candles, bar_count, end_dt, data_frequency,
writer, previous_candle=dict()):
def ingest_candles(self, candles, bar_count, start_dt, end_dt,
data_frequency,
previous_candle=dict()):
"""
Ingest candles obtained via the get_candles API of an exchange.
@@ -235,6 +236,8 @@ class ExchangeBundle:
:return:
"""
writer = self.get_writer(start_dt, end_dt, data_frequency)
num_candles = 0
data = []
for asset in candles:
@@ -291,6 +294,9 @@ class ExchangeBundle:
path=None):
reader = self.get_reader(data_frequency, path)
if reader.last_available_dt < end_dt:
return []
if data_frequency == 'minute':
values = reader.load_raw_arrays(
fields=fields,
@@ -356,6 +362,9 @@ class ExchangeBundle:
path=path
)
if not arrays:
return path
ohlcv = dict(
open=arrays[0].flatten(),
high=arrays[1].flatten(),
@@ -446,7 +455,7 @@ class ExchangeBundle:
except PricingDataBeforeTradingError:
continue
sessions = self.calendar.sessions_in_range(asset_start, asset_end)
sessions = get_periods_range(asset_start, asset_end, 'daily')
periods = []
dt = sessions[0]
@@ -510,29 +519,22 @@ class ExchangeBundle:
return chunks
def ingest(self, data_frequency, include_symbols=None,
exclude_symbols=None, start=None, end=None,
show_progress=True, environ=os.environ):
def ingest_assets(self, assets, start_dt, end_dt, data_frequency,
show_progress=False):
"""
Determine if data is missing from the bundle and attempt to ingest it.
:param data_frequency:
:param include_symbols:
:param exclude_symbols:
:param start:
:param end:
:param show_progress:
:param environ:
:param assets:
:param start_dt:
:param end_dt:
:return:
"""
assets = self.get_assets(include_symbols, exclude_symbols)
start, end = get_adj_dates(start, end, assets, data_frequency)
writer = self.get_writer(start, end, data_frequency)
writer = self.get_writer(start_dt, end_dt, data_frequency)
chunks = self.prepare_chunks(
assets=assets,
data_frequency=data_frequency,
start_dt=start,
end_dt=end
start_dt=start_dt,
end_dt=end_dt
)
with maybe_show_progress(
chunks,
@@ -551,3 +553,23 @@ class ExchangeBundle:
writer=writer,
empty_rows_behavior='strip'
)
def ingest(self, data_frequency, include_symbols=None,
exclude_symbols=None, start=None, end=None,
show_progress=True, environ=os.environ):
"""
:param data_frequency:
:param include_symbols:
:param exclude_symbols:
:param start:
:param end:
:param show_progress:
:param environ:
:return:
"""
assets = self.get_assets(include_symbols, exclude_symbols)
start_dt, end_dt = get_adj_dates(start, end, assets, data_frequency)
self.ingest_assets(assets, start_dt, end_dt, data_frequency,
show_progress)
+8 -1
View File
@@ -78,7 +78,14 @@ class AlgoPickleNotFound(ZiplineError):
class InvalidHistoryFrequencyError(ZiplineError):
msg = (
'History frequency {frequency} not supported by the exchange.'
'Frequency {frequency} not supported by the exchange.'
).strip()
class MismatchingFrequencyError(ZiplineError):
msg = (
'Bar aggregate frequency {frequency} not compatible with '
'data frequency {data_frequency}.'
).strip()
+3
View File
@@ -15,6 +15,7 @@ from six import iteritems
from catalyst.assets._assets import TradingPair
from logbook import Logger
from catalyst.exchange.exchange_bundle import ExchangeBundle
from catalyst.exchange.poloniex.poloniex_api import Poloniex_api
# from websocket import create_connection
@@ -51,6 +52,8 @@ class Poloniex(Exchange):
self.max_requests_per_minute = 20
self.request_cpt = dict()
self.bundle = ExchangeBundle(self)
def sanitize_curency_symbol(self, exchange_symbol):
"""
Helper method used to build the universal pair.
+3
View File
@@ -262,6 +262,9 @@ def _run(handle_data,
data_frequency='minute'
)
# TODO: use the constructor instead
sim_params._arena = 'live'
algorithm_class = partial(
ExchangeTradingAlgorithmLive,
exchanges=exchanges,