Refactored the data portal to use the exchange bundles

This commit is contained in:
fredfortier
2017-10-08 01:13:47 -04:00
parent 0cc9d839d0
commit 3335ae0ea9
6 changed files with 247 additions and 253 deletions
+18 -7
View File
@@ -307,7 +307,7 @@ def catalyst_magic(line, cell=None):
'%s%%catalyst' % ((cell or '') and '%'),
# don't use system exit and propogate errors to the caller
standalone_mode=False,
)
)
except SystemExit as e:
# https://github.com/mitsuhiko/click/pull/533
# even in standalone_mode=False `--help` really wants to kill us ;_;
@@ -471,27 +471,38 @@ def live(ctx,
type=Date(tz='utc', as_timestamp=True),
help='The end date of the data range. (default: today)',
)
@click.option(
'--include-symbols',
default=None,
help='A list of symbols to ingest (optional comma separated list)',
)
@click.option(
'--exclude-symbols',
default=None,
help='A list of symbols to exclude from the ingestion '
'(optional comma separated list)',
)
@click.option(
'--show-progress/--no-show-progress',
default=True,
help='Print progress information to the terminal.'
)
def ingest_exchange(exchange_name, data_frequency, start, end,
show_progress):
include_symbols, exclude_symbols, show_progress):
"""
Ingest data for the given exchange.
"""
exchange_bundle = ExchangeBundle(exchange_name)
click.echo('ingesting exchange bundle {}'.format(exchange_name))
exchange_bundle = ExchangeBundle(
exchange_name=exchange_name,
exchange_bundle.ingest(
data_frequency=data_frequency,
include_symbols=None,
exclude_symbols=None,
include_symbols=include_symbols,
exclude_symbols=exclude_symbols,
start=start,
end=end,
show_progress=show_progress
)
exchange_bundle.ingest()
@main.command()
+33 -8
View File
@@ -1,14 +1,9 @@
import datetime
from logging import Logger, DEBUG
import os
from dateutil.relativedelta import relativedelta
import pandas as pd
from logging import Logger
from catalyst import get_calendar
from catalyst.data.minute_bars import BcolzMinuteBarWriter
from catalyst.data.us_equity_pricing import BcolzDailyBarWriter
from catalyst.exchange.exchange_utils import get_exchange_folder
from catalyst.utils.paths import data_root, ensure_directory
from catalyst.data.bundles import from_bundle_ingest_dirname
from catalyst.utils.paths import data_path
log = Logger('test_exchange_bundle')
@@ -95,3 +90,33 @@ def fetch_candles_chunk(exchange, assets, data_frequency, end_dt, bar_count):
end_dt=end_dt
)
return candles
def find_most_recent_time(bundle_name):
"""
Find most recent "time folder" for a given bundle.
:param bundle_name:
The name of the targeted bundle.
:return folder:
The name of the time folder.
"""
try:
bundle_folders = os.listdir(
data_path([bundle_name]),
)
except OSError:
return None
most_recent_bundle = dict()
for folder in bundle_folders:
date = from_bundle_ingest_dirname(folder)
if not most_recent_bundle or date > \
most_recent_bundle[most_recent_bundle.keys()[0]]:
most_recent_bundle = dict()
most_recent_bundle[folder] = date
if most_recent_bundle:
return most_recent_bundle.keys()[0]
else:
return None
+11 -89
View File
@@ -12,24 +12,19 @@
# limitations under the License.
import abc
import os
from time import sleep
import pandas as pd
from catalyst.assets._assets import TradingPair
from logbook import Logger
from catalyst.data.bundles.core import from_bundle_ingest_dirname, \
minute_path, daily_path
from catalyst.data.data_portal import DataPortal
from catalyst.data.minute_bars import BcolzMinuteBarReader
from catalyst.data.us_equity_pricing import BcolzDailyBarReader
from catalyst.exchange.exchange_bundle import ExchangeBundle
from catalyst.exchange.exchange_errors import (
ExchangeRequestError,
ExchangeBarDataError,
BundleNotFoundError, PricingDataBeforeTradingError,
PricingDataBeforeTradingError,
PricingDataNotLoadedError, InvalidHistoryFrequencyError)
from catalyst.utils.paths import data_path
log = Logger('DataPortalExchange')
@@ -259,34 +254,14 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase):
def __init__(self, *args, **kwargs):
super(DataPortalExchangeBacktest, self).__init__(*args, **kwargs)
self.daily_bar_readers = dict()
self.minute_bar_readers = dict()
self.exchange_bundles = dict()
self.history_loaders = dict()
self.minute_history_loaders = dict()
for exchange_name in self.exchanges:
name = 'exchange_{}'.format(exchange_name)
time_folder = \
DataPortalExchangeBacktest.find_most_recent_time(name)
if time_folder is None:
raise BundleNotFoundError(exchange=exchange_name)
try:
self.daily_bar_readers[exchange_name] = \
BcolzDailyBarReader(
daily_path(name, time_folder),
)
except IOError:
self.daily_bar_readers[exchange_name] = None
try:
self.minute_bar_readers[exchange_name] = \
BcolzMinuteBarReader(
minute_path(name, time_folder),
)
except IOError:
self.minute_bar_readers[exchange_name] = None
self.exchange_bundles[exchange_name] = \
ExchangeBundle(exchange_name)
def _get_first_trading_day(self, assets):
first_date = None
@@ -295,62 +270,6 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase):
first_date = asset.start_date
return first_date
@staticmethod
def find_most_recent_time(bundle_name):
"""
Find most recent "time folder" for a given bundle.
:param bundle_name:
The name of the targeted bundle.
:return folder:
The name of the time folder.
"""
try:
bundle_folders = os.listdir(
data_path([bundle_name]),
)
except OSError:
return None
most_recent_bundle = dict()
for folder in bundle_folders:
date = from_bundle_ingest_dirname(folder)
if not most_recent_bundle or date > \
most_recent_bundle[most_recent_bundle.keys()[0]]:
most_recent_bundle = dict()
most_recent_bundle[folder] = date
if most_recent_bundle:
return most_recent_bundle.keys()[0]
else:
return None
def _get_reader(self, data_frequency, exchange_name):
"""
Pick from a collection of readers based of exchange name and frequency.
:param data_frequency:
The reader frequency: minute, daily.
:param exchange_name:
The exchange name.
:return reader:
A reader object.
"""
if data_frequency == 'minute':
reader = self.minute_bar_readers[exchange_name]
elif data_frequency == 'daily':
reader = self.daily_bar_readers[exchange_name]
else:
raise InvalidHistoryFrequencyError(frequency=data_frequency)
if reader is None:
raise ValueError('reader not found')
return reader
def get_exchange_history_window(self,
exchange,
assets,
@@ -360,7 +279,9 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase):
field,
data_frequency,
ffill=True):
reader = self._get_reader(data_frequency, exchange.name)
bundle = self.exchange_bundles[exchange.name]
reader = bundle.get_reader(data_frequency)
if data_frequency == 'minute':
dts = self.trading_calendar.minutes_window(
end_dt, -bar_count
@@ -416,7 +337,8 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase):
def get_exchange_spot_value(self, exchange, assets, field, dt,
data_frequency):
reader = self._get_reader(data_frequency, exchange.name)
bundle = self.exchange_bundles[exchange.name]
reader = bundle.get_reader(data_frequency)
self.ensure_after_first_day(dt, assets)
+179 -145
View File
@@ -26,19 +26,12 @@ log = Logger('exchange_bundle')
class ExchangeBundle:
def __init__(self, exchange_name, data_frequency, include_symbols=None,
exclude_symbols=None, start=None, end=None,
show_progress=True, environ=os.environ):
def __init__(self, exchange_name, ):
self.exchange = get_exchange(exchange_name)
self.data_frequency = data_frequency
self.assets = self.get_assets(include_symbols, exclude_symbols)
self.start, self.end = self.get_adj_dates(start, end)
self.environ = environ
self.show_progress = show_progress
self.minutes_per_day = 1440
self.default_ohlc_ratio = 1000000
self._writer = None
self._reader = None
self._writers = dict()
self._readers = dict()
def get_assets(self, include_symbols, exclude_symbols):
# TODO: filter exclude symbols assets
@@ -50,14 +43,14 @@ class ExchangeBundle:
else:
return self.exchange.get_assets()
def get_adj_dates(self, start, end):
def get_adj_dates(self, start, end, assets):
now = pd.Timestamp.utcnow()
if end > now:
log.info('adjusting the end date to now {}'.format(now))
end = now
earliest_trade = None
for asset in self.assets:
for asset in assets:
if earliest_trade is None or earliest_trade > asset.start_date:
earliest_trade = asset.start_date
@@ -73,80 +66,90 @@ class ExchangeBundle:
return start, end
@property
def reader(self):
if self._reader is not None:
return self._reader
def get_reader(self, data_frequency):
"""
Get a data writer object, either a new object or from cache
:return: BcolzMinuteBarReader or BcolzDailyBarReader
"""
if data_frequency in self._readers:
return self._readers[data_frequency]
root = get_exchange_folder(self.exchange.name)
input_dir = BUNDLE_NAME_TEMPLATE.format(
root=root,
frequency=self.data_frequency
frequency=data_frequency
)
if self.data_frequency == 'minute':
if data_frequency == 'minute':
try:
self._reader = BcolzMinuteBarReader(input_dir)
self._readers[data_frequency] = BcolzMinuteBarReader(input_dir)
except IOError:
log.debug('no reader data found in {}'.format(input_dir))
elif self.data_frequency == 'daily':
elif data_frequency == 'daily':
try:
self._reader = BcolzDailyBarReader(input_dir)
self._readers[data_frequency] = BcolzDailyBarReader(input_dir)
except IOError:
log.debug('no reader data found in {}'.format(input_dir))
else:
raise ValueError(
'invalid frequency {}'.format(self.data_frequency)
'invalid frequency {}'.format(data_frequency)
)
return self._reader
return self._readers[data_frequency]
@property
def writer(self):
if self._writer is not None:
return self._writer
def get_writer(self, data_frequency, start, end):
"""
Get a data writer object, either a new object or from cache
:return: BcolzMinuteBarWriter or BcolzDailyBarWriter
"""
key = (data_frequency, start, end)
if key in self._writers:
return self._writers[key]
open_calendar = get_calendar('OPEN')
root = get_exchange_folder(self.exchange.name)
output_dir = BUNDLE_NAME_TEMPLATE.format(
root=root,
frequency=self.data_frequency
frequency=data_frequency
)
ensure_directory(output_dir)
if self.data_frequency == 'minute':
if data_frequency == 'minute':
if len(os.listdir(output_dir)) > 0:
self._writer = BcolzMinuteBarWriter.open(output_dir, self.end)
self._writers[key] = \
BcolzMinuteBarWriter.open(output_dir, end)
else:
self._writer = BcolzMinuteBarWriter(
self._writers[key] = BcolzMinuteBarWriter(
rootdir=output_dir,
calendar=open_calendar,
minutes_per_day=self.minutes_per_day,
start_session=self.start,
end_session=self.end,
start_session=start,
end_session=end,
write_metadata=True,
default_ohlc_ratio=self.default_ohlc_ratio
)
elif self.data_frequency == 'daily':
elif data_frequency == 'daily':
if len(os.listdir(output_dir)) > 0:
self._writer = BcolzDailyBarWriter.open(output_dir, self.end)
self._writers[key] = BcolzDailyBarWriter.open(output_dir, end)
else:
self._writer = BcolzDailyBarWriter(
self._writers[key] = BcolzDailyBarWriter(
filename=output_dir,
calendar=open_calendar,
start_session=self.start,
end_session=self.end
start_session=start,
end_session=end
)
else:
raise ValueError(
'invalid frequency {}'.format(self.data_frequency)
'invalid frequency {}'.format(data_frequency)
)
return self._writer
return self._writers[key]
def filter_existing_assets(self, assets, start, end):
def filter_existing_assets(self, assets, start, end, data_frequency):
"""
For each asset, get the close on the start and end dates of the chunk.
If the data exists, the chunk ingestion is complete.
@@ -161,20 +164,19 @@ class ExchangeBundle:
:return: list[TradingPair]
The assets missing from the bundle
"""
reader = self.get_reader(data_frequency)
missing_assets = []
for asset in assets:
has_data = True
if has_data and self.reader is not None:
if has_data and reader is not None:
try:
start_close = self.reader.get_value(
asset.sid, start, 'close')
start_close = reader.get_value(asset.sid, start, 'close')
if np.isnan(start_close):
has_data = False
else:
end_close = self.reader.get_value(
asset.sid, end, 'close')
end_close = reader.get_value(asset.sid, end, 'close')
if np.isnan(end_close):
has_data = False
@@ -190,36 +192,148 @@ class ExchangeBundle:
return missing_assets
def ingest(self):
def ingest_chunk(self, chunk, previous_candle, data_frequency, assets,
writer):
chunk_end = chunk['end']
chunk_start = chunk_end - timedelta(minutes=chunk['bar_count'])
chunk_assets = []
for asset in assets:
if asset.start_date <= chunk_end:
chunk_assets.append(asset)
missing_assets = self.filter_existing_assets(
assets=chunk_assets,
start=chunk_start,
end=chunk_end,
data_frequency=data_frequency
)
if len(missing_assets) == 0:
log.debug('the data chunk already exists')
return
# TODO: ensure correct behavior for assets starting in the chunk
candles = fetch_candles_chunk(
exchange=self.exchange,
assets=missing_assets,
data_frequency=data_frequency,
end_dt=chunk_end,
bar_count=chunk['bar_count']
)
num_candles = 0
data = []
for asset in candles:
asset_candles = candles[asset]
if not asset_candles:
log.debug(
'no data: {symbols} on {exchange}, date {end}'.format(
symbols=missing_assets,
exchange=self.exchange.name,
end=chunk_end
)
)
continue
all_dates = []
all_candles = []
date = chunk_start
while date <= chunk_end:
previous = previous_candle[asset] \
if asset in previous_candle else None
candle = next((candle for candle in asset_candles \
if candle['last_traded'] == date),
previous)
if candle is not None:
all_dates.append(date)
all_candles.append(candle)
previous_candle[asset] = candle
date += timedelta(minutes=1)
df = pd.DataFrame(all_candles, index=all_dates)
if not df.empty:
df.sort_index(inplace=True)
sid = asset.sid
num_candles += len(df.values)
data.append((sid, df))
try:
log.debug(
'writing {num_candles} candles from {start} to {end}'.format(
num_candles=num_candles,
start=chunk_start,
end=chunk_end
)
)
for pair in data:
log.debug('data for sid {}\n{}\n{}'.format(
pair[0], pair[1].head(2), pair[1].tail(2)))
writer.write(
data=data,
show_progress=False,
invalid_data_behavior='raise'
)
except BcolzMinuteOverlappingData as e:
log.warn('chunk already exists {}: {}'.format(chunk, e))
def ingest(self, data_frequency, include_symbols=None,
exclude_symbols=None, start=None, end=None,
show_progress=True, environ=os.environ):
"""
Ingest the bundle
:param data_frequency:
:param include_symbols:
:param exclude_symbols:
:param start:
:param end:
:param show_progress:
:param environ:
:return:
"""
assets = self.get_assets(include_symbols, exclude_symbols)
start, end = self.get_adj_dates(start, end, assets)
symbols = []
log.debug(
'ingesting trading pairs {symbols} on exchange {exchange} '
'from {start} to {end}'.format(
symbols=symbols,
exchange=self.exchange.name,
start=self.start,
end=self.end
start=start,
end=end
)
)
delta = self.end - self.start
if self.data_frequency == 'minute':
delta = end - start
if data_frequency == 'minute':
delta_periods = delta.total_seconds() / 60
frequency = '1m'
elif self.data_frequency == 'daily':
elif data_frequency == 'daily':
delta_periods = delta.total_seconds() / 60 / 60 / 24
frequency = '1d'
else:
raise ValueError('frequency not supported')
writer = self.get_writer(data_frequency, start, end)
if delta_periods > self.exchange.num_candles_limit:
bar_count = self.exchange.num_candles_limit
chunks = []
last_chunk_date = self.end.floor('1 min')
while last_chunk_date > self.start + timedelta(minutes=bar_count):
last_chunk_date = end.floor('1 min')
while last_chunk_date > start + timedelta(minutes=bar_count):
# TODO: account for the partial last bar
chunk = dict(end=last_chunk_date, bar_count=bar_count)
chunks.append(chunk)
@@ -231,102 +345,22 @@ class ExchangeBundle:
chunks.reverse()
else:
chunks = [dict(end=self.end, bar_count=delta_periods)]
chunks = [dict(end=end, bar_count=delta_periods)]
with maybe_show_progress(
chunks,
self.show_progress,
show_progress,
label='Fetching {exchange} {frequency} candles: '.format(
exchange=self.exchange.name,
frequency=self.data_frequency
frequency=data_frequency
)) as it:
previous_candle = dict()
for chunk in it:
chunk_end = chunk['end']
chunk_start = chunk_end - timedelta(minutes=chunk['bar_count'])
chunk_assets = []
for asset in self.assets:
if asset.start_date <= chunk_end:
chunk_assets.append(asset)
missing_assets = self.filter_existing_assets(
chunk_assets, chunk_start, chunk_end)
if len(missing_assets) == 0:
log.debug('the data chunk already exists')
continue
# TODO: ensure correct behavior for assets starting in the chunk
candles = fetch_candles_chunk(
exchange=self.exchange,
assets=missing_assets,
data_frequency=frequency,
end_dt=chunk_end,
bar_count=chunk['bar_count']
self.ingest_chunk(
chunk=chunk,
previous_candle=previous_candle,
data_frequency=data_frequency,
assets=assets,
writer=writer
)
num_candles = 0
data = []
for asset in candles:
asset_candles = candles[asset]
if not asset_candles:
log.debug(
'no data: {symbols} on {exchange}, date {end}'.format(
symbols=missing_assets,
exchange=self.exchange.name,
end=chunk_end
)
)
continue
all_dates = []
all_candles = []
date = chunk_start
while date <= chunk_end:
previous = previous_candle[asset] \
if asset in previous_candle else None
candle = next((candle for candle in asset_candles \
if candle['last_traded'] == date),
previous)
if candle is not None:
all_dates.append(date)
all_candles.append(candle)
previous_candle[asset] = candle
date += timedelta(minutes=1)
df = pd.DataFrame(all_candles, index=all_dates)
if not df.empty:
df.sort_index(inplace=True)
sid = asset.sid
num_candles += len(df.values)
data.append((sid, df))
try:
log.debug(
'writing {num_candles} candles from {start} to {end}'.format(
num_candles=num_candles,
start=chunk_start,
end=chunk_end
)
)
for pair in data:
log.debug('data for sid {}\n{}\n{}'.format(
pair[0], pair[1].head(2), pair[1].tail(2)))
self.writer.write(
data=data,
show_progress=False,
invalid_data_behavior='raise'
)
except BcolzMinuteOverlappingData as e:
log.warn('chunk already exists {}: {}'.format(chunk, e))
+3 -3
View File
@@ -14,9 +14,10 @@ class ExchangeBundleTestCase:
start = pd.to_datetime('2017-09-01', utc=True)
end = pd.Timestamp.utcnow()
exchange_bundle = ExchangeBundle(exchange_name)
log.info('ingesting exchange bundle {}'.format(exchange_name))
exchange_bundle = ExchangeBundle(
exchange_name=exchange_name,
exchange_bundle.ingest(
data_frequency='minute',
include_symbols='neo_btc',
exclude_symbols=None,
@@ -24,5 +25,4 @@ class ExchangeBundleTestCase:
end=end,
show_progress=True
)
exchange_bundle.ingest()
pass
+3 -1
View File
@@ -90,6 +90,8 @@ class ExchangeDataPortalTestCase:
'1m',
'close',
'minute')
log.info('found history window: {}'.format(data))
pass
def test_get_spot_value_backtest(self):
@@ -102,5 +104,5 @@ class ExchangeDataPortalTestCase:
date = pd.to_datetime('2017-09-10', utc=True)
value = self.data_portal_backtest.get_spot_value(
assets, 'close', date, 'minute')
log.info('found spot value {}'.format(value))
pass