Implemented daily data loader and related fixes

This commit is contained in:
fredfortier
2017-10-17 03:00:36 -04:00
parent 1263fdd995
commit 9bdd8aba48
3 changed files with 106 additions and 63 deletions
+1 -1
View File
@@ -530,7 +530,7 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
self.add_exposure_stats(minute_stats)
print_df = pd.DataFrame(list(self.minute_stats))
log.debug(
log.info(
'statistics for the last {stats_minutes} minutes:\n{stats}'.format(
stats_minutes=self.stats_minutes,
stats=get_pretty_stats(
+90 -47
View File
@@ -13,7 +13,8 @@ from catalyst.data.us_equity_pricing import BcolzDailyBarWriter, \
BcolzDailyBarReader
from catalyst.exchange.bundle_utils import get_ffill_candles, range_in_bundle, \
get_bcolz_chunk, get_delta
from catalyst.exchange.exchange_errors import EmptyValuesInBundleError
from catalyst.exchange.exchange_errors import EmptyValuesInBundleError, \
InvalidHistoryFrequencyError
from catalyst.exchange.exchange_utils import get_exchange_folder
from catalyst.utils.cli import maybe_show_progress
from catalyst.utils.paths import ensure_directory
@@ -106,8 +107,8 @@ class ExchangeBundle:
except IOError:
log.debug('no reader data found in {}'.format(input_dir))
else:
raise ValueError(
'invalid frequency {}'.format(data_frequency)
raise InvalidHistoryFrequencyError(
frequency=data_frequency
)
return self._readers[data_frequency]
@@ -186,8 +187,8 @@ class ExchangeBundle:
end_session=end_session
)
else:
raise ValueError(
'invalid frequency {}'.format(data_frequency)
raise InvalidHistoryFrequencyError(
frequency=data_frequency
)
return self._writers[key]
@@ -326,7 +327,7 @@ class ExchangeBundle:
"""
def ingest_ctable(self, asset, data_frequency, period, writer,
verify=False, cleanup=False):
empty_rows_behavior='warn', cleanup=False):
"""
Merge a ctable bundle chunk into the main bundle for the exchange.
@@ -334,7 +335,7 @@ class ExchangeBundle:
:param data_frequency: str
:param period: str
:param writer:
:param verify: bool
:param empty_rows_behavior: str
Ensure that the bundle does not have any missing data.
:param cleanup: bool
@@ -350,22 +351,41 @@ class ExchangeBundle:
period=period
)
reader = BcolzMinuteBarReader(path)
start = reader.first_trading_day
# end = reader.last_available_dt
end = reader.last_available_dt
periods = self.calendar.minutes_in_range(start, end)
sid = asset.sid
arrays = reader.load_raw_arrays(
fields=['open', 'high', 'low', 'close', 'volume'],
start_dt=start,
end_dt=end,
sids=[sid]
)
if data_frequency == 'minute':
reader = BcolzMinuteBarReader(path)
start = reader.first_trading_day
end = reader.last_available_dt
periods = self.calendar.minutes_in_range(start, end)
arrays = reader.load_raw_arrays(
fields=['open', 'high', 'low', 'close', 'volume'],
start_dt=start,
end_dt=end,
sids=[sid]
)
elif data_frequency == 'daily':
reader = BcolzDailyBarReader(path)
start = writer._start_session
end = writer._end_session
periods = self.calendar.sessions_in_range(start, end)
# Note that the parameters convention is totally different
# from the minute reader.
arrays = reader.load_raw_arrays(
columns=['open', 'high', 'low', 'close', 'volume'],
start_date=start,
end_date=end,
assets=[asset]
)
else:
raise InvalidHistoryFrequencyError(frequency=data_frequency)
ohlcv = dict(
open=arrays[0].flatten(),
@@ -380,7 +400,7 @@ class ExchangeBundle:
index=periods
)
if verify:
if empty_rows_behavior is not 'ignore':
nan_rows = df[df.isnull().T.any().T].index
if len(nan_rows) > 0:
@@ -404,19 +424,22 @@ class ExchangeBundle:
dates.append(pd.to_datetime(nan_rows.values[-1]))
name = path.split('/')[-1]
log.warn(
'\n{name} with end minute {end_minute} has empty rows '
'in ranges: {dates}'.format(
if empty_rows_behavior == 'warn':
log.warn(
'\n{name} with end minute {end_minute} has empty rows '
'in ranges: {dates}'.format(
name=name,
end_minute=asset.end_minute,
dates=dates
)
)
elif empty_rows_behavior == 'raise':
raise EmptyValuesInBundleError(
name=name,
end_minute=asset.end_minute,
dates=dates
)
)
raise EmptyValuesInBundleError(
name=name,
end_minute=asset.end_minute,
dates=dates
)
data = []
if not df.empty:
@@ -458,34 +481,53 @@ class ExchangeBundle:
periods = []
dt = sessions[0]
while dt <= sessions[-1]:
period = '{}-{}'.format(dt.year, dt.month)
period = '{}-{}'.format(dt.year, dt.month) \
if data_frequency == 'minute' else '{}'.format(dt.year)
if period not in periods:
periods.append(period)
month_range = calendar.monthrange(dt.year, dt.month)
month_start = pd.to_datetime(
datetime(dt.year, dt.month, 1, 0, 0, 0, 0),
utc=True)
if data_frequency == 'minute':
month_range = calendar.monthrange(dt.year, dt.month)
period_start = pd.to_datetime(
datetime(dt.year, dt.month, 1, 0, 0, 0, 0),
utc=True)
month_end = pd.to_datetime(
datetime(
dt.year, dt.month, month_range[1], 23, 59, 0, 0),
utc=True
)
period_end = pd.to_datetime(
datetime(
dt.year, dt.month, month_range[1], 23, 59, 0,
0),
utc=True
)
if month_end > asset_end:
month_end = asset_end
elif data_frequency == 'daily':
period_start = pd.to_datetime(
datetime(dt.year, 1, 1, 0, 0, 0, 0),
utc=True)
period_end = pd.to_datetime(
datetime(
dt.year, 12, 31, 23, 59, 0, 0),
utc=True
)
else:
raise InvalidHistoryFrequencyError(
frequency=data_frequency
)
if period_end > asset_end:
period_end = asset_end
has_data = \
range_in_bundle(asset, month_start, month_end, reader)
range_in_bundle(asset, period_start, period_end,
reader)
if not has_data:
log.debug('adding period: {}'.format(period))
chunks.append(
dict(
asset=asset,
period_end=month_end,
period_end=period_end,
period=period
)
)
@@ -532,5 +574,6 @@ class ExchangeBundle:
asset=chunk['asset'],
data_frequency=data_frequency,
period=chunk['period'],
writer=writer
writer=writer,
empty_rows_behavior='ignore'
)
+15 -15
View File
@@ -54,15 +54,15 @@ class ExchangeBundleTestCase:
def test_ingest_daily(self):
exchange_name = 'bitfinex'
start = pd.to_datetime('2017-09-01', utc=True)
end = pd.Timestamp.utcnow()
start = pd.to_datetime('2017-01-01', utc=True)
end = pd.to_datetime('2017-09-30', utc=True)
exchange_bundle = ExchangeBundle(get_exchange(exchange_name))
log.info('ingesting exchange bundle {}'.format(exchange_name))
exchange_bundle.ingest(
data_frequency='daily',
include_symbols='neo_btc',
include_symbols='neo_btc,bch_btc,eth_btc',
exclude_symbols=None,
start=start,
end=end,
@@ -71,19 +71,19 @@ class ExchangeBundleTestCase:
pass
def test_merge_ctables(self):
exchange_name = 'bitfinex'
exchange_name = 'poloniex'
data_frequency = 'minute'
exchange = get_exchange(exchange_name)
# asset = exchange.get_asset('gno_btc')
asset = exchange.get_asset('gno_btc')
start = pd.to_datetime('2017-5-1', utc=True)
end = pd.to_datetime('2017-5-31', utc=True)
# asset = exchange.get_asset('neo_btc')
#
# start = pd.to_datetime('2017-5-1', utc=True)
# end = pd.to_datetime('2017-5-31', utc=True)
asset = exchange.get_asset('neo_btc')
start = pd.to_datetime('2017-9-1', utc=True)
end = pd.to_datetime('2017-9-30', utc=True)
# start = pd.to_datetime('2017-9-1', utc=True)
# end = pd.to_datetime('2017-9-30', utc=True)
exchange_bundle = ExchangeBundle(exchange)
@@ -91,9 +91,9 @@ class ExchangeBundleTestCase:
exchange_bundle.ingest_ctable(
asset=asset,
data_frequency=data_frequency,
period='2017-9',
period='2017-5',
writer=writer,
verify=True
empty_rows_behavior='raise'
)
pass
@@ -111,7 +111,7 @@ class ExchangeBundleTestCase:
exchange_name=exchange_name,
symbol=asset.symbol,
data_frequency=data_frequency,
period='2017-5'
period='2017-5',
)
reader = BcolzMinuteBarReader(path)
pass