Unit testing ingestion of bundles logic

This commit is contained in:
fredfortier
2017-10-13 00:50:25 -04:00
parent e1c2f40ab9
commit c658d15fcb
3 changed files with 129 additions and 41 deletions
@@ -33,6 +33,13 @@ def initialize(context):
def _handle_data(context, data):
price = data.current(context.asset, 'close')
log.info('got price {price}'.format(price=price))
if price is None:
log.warn('no pricing data')
return
prices = data.history(
context.asset,
fields='price',
@@ -55,13 +62,6 @@ def _handle_data(context, data):
cash = context.portfolio.cash
log.info('base currency available: {cash}'.format(cash=cash))
price = data.current(context.asset, 'close')
log.info('got price {price}'.format(price=price))
if price is None:
log.warn('no pricing data')
return
record(price=price)
orders = get_open_orders(context.asset)
+92 -32
View File
@@ -1,6 +1,8 @@
import os
import time
from datetime import timedelta
import bcolz
import pandas as pd
from logbook import Logger, DEBUG, INFO
@@ -209,7 +211,38 @@ class ExchangeBundle:
return missing_assets
@deprecated
def _write(self, data, writer, data_frequency):
"""
Write data to the writer
:param df:
:param writer:
:return:
"""
try:
writer.write(
data=data,
show_progress=False,
invalid_data_behavior='raise'
)
except BcolzMinuteOverlappingData as e:
log.warn('chunk already exists: {}'.format(e))
except Exception as e:
log.warn('error when writing data: {}, trying again'.format(e))
# This is workaround, there is an issue with empty
# session_label when using a newly created writer
del self._writers[data_frequency]
# TODO: these are the dates of the chunk, not the job
writer = self.get_writer(writer._start_session,
writer._end_session, data_frequency)
writer.write(
data=data,
show_progress=False,
invalid_data_behavior='raise'
)
def ingest_chunk(self, bar_count, end_dt, data_frequency, asset,
writer, previous_candle=dict()):
"""
@@ -264,6 +297,7 @@ class ExchangeBundle:
index=all_dates,
columns=['open', 'high', 'low', 'close', 'volume']
)
if not df.empty:
df.sort_index(inplace=True)
@@ -272,41 +306,67 @@ class ExchangeBundle:
data.append((sid, df))
try:
log.debug(
'writing {num_candles} candles for {bar_count} bars'
'ending {end}'.format(
num_candles=num_candles,
bar_count=bar_count,
end=end_dt
)
)
writer.write(
data=data,
show_progress=False,
invalid_data_behavior='raise'
)
except BcolzMinuteOverlappingData as e:
log.warn('chunk already exists: {}'.format(e))
except Exception as e:
log.warn('error when writing data: {}, trying again'.format(e))
# This is workaround, there is an issue with empty
# session_label when using a newly created writer
del self._writers[data_frequency]
# TODO: these are the dates of the chunk, not the job
start_dt = get_start_dt(end_dt, bar_count, data_frequency)
writer = self.get_writer(start_dt, end_dt, data_frequency)
writer.write(
data=data,
show_progress=False,
invalid_data_behavior='raise'
log.debug(
'writing {num_candles} candles for {bar_count} bars'
'ending {end}'.format(
num_candles=num_candles,
bar_count=bar_count,
end=end_dt
)
)
self._write(data, writer, data_frequency)
return data
def ingest_ctable(self, asset, data_frequency, path):
start_time = time.time()
reader = BcolzMinuteBarReader(path)
start = reader.first_trading_day
end = reader.last_available_dt
open_calendar = get_calendar('OPEN')
periods = open_calendar.minutes_in_range(start, end)
sid = 284
arrays = reader.load_raw_arrays(
fields=['open', 'high', 'low', 'close', 'volume'],
start_dt=start,
end_dt=end,
sids=[sid]
)
ohlcv = dict(
open=arrays[0].flatten(),
high=arrays[1].flatten(),
low=arrays[2].flatten(),
close=arrays[3].flatten(),
volume=arrays[4].flatten()
)
df = pd.DataFrame(
data=ohlcv,
index=periods
)
data = []
if not df.empty:
df.sort_index(inplace=True)
data.append((sid, df))
writer = self.get_writer(start, end, data_frequency)
self._write(data, writer, data_frequency)
end_time = time.time()
delta_time = end_time - start_time
log.info('time elapsed {}'.format(delta_time))
pass
def ingest(self, data_frequency, include_symbols=None,
exclude_symbols=None, start=None, end=None,
show_progress=True, environ=os.environ):
+30 -2
View File
@@ -1,6 +1,8 @@
from datetime import timedelta
from datetime import timedelta, time
from logging import Logger
import bcolz
from toolz.itertoolz import join as joinz
import pandas as pd
from catalyst.exchange.exchange_bundle import ExchangeBundle
@@ -22,7 +24,7 @@ class ExchangeBundleTestCase:
log.info('ingesting exchange bundle {}'.format(exchange_name))
exchange_bundle.ingest(
data_frequency='minute',
include_symbols='neo_btc',
include_symbols='bcc_btc',
exclude_symbols=None,
start=start,
end=end,
@@ -67,3 +69,29 @@ class ExchangeBundleTestCase:
show_progress=True
)
pass
def test_merge_ctables(self):
exchange_name = 'bitfinex'
root = '/Users/fredfortier/.catalyst/data/exchanges/bitfinex/temp_bundles'
path = '00/02/000284.bcolz'
august = '{}/{}'.format(
root, 'poloniex-minute-btc_usdt-2017-8'
)
exchange = get_exchange(exchange_name)
asset = exchange.get_asset('btc_usd')
exchange_bundle = ExchangeBundle(exchange)
exchange_bundle.ingest_ctable(
asset=asset,
data_frequency='minute',
path=august
)
september = '{}/{}/{}'.format(
root, 'poloniex-minute-btc_usdt-2017-9', path
)
zseptember = bcolz.open(september, mode='a')
pass