Improvements and fixes to the ingestion component

This commit is contained in:
fredfortier
2017-10-14 02:06:26 -04:00
parent c52653c84e
commit bdbaad1c91
4 changed files with 58 additions and 17 deletions
+3 -3
View File
@@ -6,7 +6,7 @@ from catalyst.api import symbol
def initialize(context):
print('initializing')
context.asset = symbol('btc_usdt')
context.asset = symbol('gno_btc')
def handle_data(context, data):
@@ -18,8 +18,8 @@ def handle_data(context, data):
run_algorithm(
capital_base=250,
start=pd.to_datetime('2017-1-1', utc=True),
end=pd.to_datetime('2017-1-31', utc=True),
start=pd.to_datetime('2017-5-1', utc=True),
end=pd.to_datetime('2017-5-31', utc=True),
data_frequency='minute',
initialize=initialize,
handle_data=handle_data,
+11 -1
View File
@@ -1,5 +1,5 @@
import gzip
import tarfile
import shutil
import requests
from datetime import timedelta, datetime
@@ -221,6 +221,16 @@ def get_ffill_candles(candles, bar_count, end_dt, data_frequency,
def range_in_bundle(asset, start_dt, end_dt, reader):
"""
Evaluate whether price data of an asset is included has been ingested in
the exchange bundle for the given date range.
:param asset:
:param start_dt:
:param end_dt:
:param reader:
:return:
"""
has_data = True
if has_data and reader is not None:
try:
+41 -11
View File
@@ -1,5 +1,7 @@
import calendar
import os
import shutil
import pytz
from datetime import timedelta, datetime
@@ -50,17 +52,20 @@ class ExchangeBundle:
else:
return self.exchange.get_assets()
def get_adj_dates(self, start, end, assets):
now = pd.Timestamp.utcnow()
if end is None or end > now:
log.debug('adjusting the end date to now {}'.format(now))
end = now
def get_adj_dates(self, start, end, assets, data_frequency):
earliest_trade = None
last_entry = None
for asset in assets:
if earliest_trade is None or earliest_trade > asset.start_date:
earliest_trade = asset.start_date
end_asset = asset.end_minute if data_frequency == 'minute' else \
asset.end_daily
if end_asset is not None and \
(last_entry is None or end_asset > last_entry):
last_entry = end_asset
if start is None or earliest_trade > start:
log.debug(
'adjusting start date to earliest trade date found {}'.format(
@@ -68,6 +73,10 @@ class ExchangeBundle:
))
start = earliest_trade
if end is None or (last_entry is not None and end > last_entry):
log.debug('adjusting the end date to now {}'.format(last_entry))
end = last_entry
if start >= end:
raise ValueError('start date cannot be after end date')
@@ -328,7 +337,7 @@ class ExchangeBundle:
"""
def ingest_ctable(self, asset, data_frequency, period, writer,
verify=False):
verify=False, cleanup=False):
"""
Merge a ctable bundle chunk into the main bundle for the exchange.
@@ -336,7 +345,12 @@ class ExchangeBundle:
:param data_frequency: str
:param period: str
:param writer:
:param verify:
:param verify: bool
Ensure that the bundle does not have any missing data.
:param cleanup: bool
Remove the temp bundle directory after ingestion.
:return:
"""
@@ -390,10 +404,15 @@ class ExchangeBundle:
data = []
if not df.empty:
df.sort_index(inplace=True)
data.append((sid, df))
self._write(data, writer, data_frequency)
if cleanup:
log.debug('removing bundle folder following '
'ingestion: {}'.format(path))
shutil.rmtree(path)
return path
def ingest(self, data_frequency, include_symbols=None,
@@ -412,15 +431,22 @@ class ExchangeBundle:
"""
assets = self.get_assets(include_symbols, exclude_symbols)
start, end = self.get_adj_dates(start, end, assets)
start, end = self.get_adj_dates(start, end, assets, data_frequency)
reader = self.get_reader(data_frequency)
chunks = []
periods = []
for asset in assets:
asset_start, asset_end = self.get_adj_dates(start, end, [asset])
try:
asset_start, asset_end = \
self.get_adj_dates(start, end, [asset], data_frequency)
except ValueError:
dt += timedelta(days=1)
continue
sessions = self.calendar.sessions_in_range(asset_start, asset_end)
periods = []
dt = sessions[0]
while dt <= sessions[-1]:
period = '{}-{}'.format(dt.year, dt.month)
@@ -438,6 +464,10 @@ class ExchangeBundle:
datetime(dt.year, dt.month, month_range[1] - 1,
23, 59, 0, 0),
utc=True)
if month_end > asset_end:
month_end = asset_end
has_data = \
range_in_bundle(asset, month_start, month_end, reader)
+3 -2
View File
@@ -21,7 +21,8 @@ class ExchangeBundleTestCase:
log.info('ingesting exchange bundle {}'.format(exchange_name))
exchange_bundle.ingest(
data_frequency='minute',
include_symbols='btc_usdt',
include_symbols='gno_btc',
# include_symbols=None,
exclude_symbols=None,
start=start,
end=end,
@@ -72,7 +73,7 @@ class ExchangeBundleTestCase:
data_frequency = 'minute'
exchange = get_exchange(exchange_name)
asset = exchange.get_asset('btc_usdt')
asset = exchange.get_asset('gno_btc')
start = pd.to_datetime('2017-5-1', utc=True)
end = pd.to_datetime('2017-5-31', utc=True)