Unit tested ingestion of bundle chunks. This may not be stable yet.

This commit is contained in:
fredfortier
2017-10-13 16:29:43 -04:00
parent c658d15fcb
commit 93f4d31399
6 changed files with 118 additions and 128 deletions
@@ -91,6 +91,7 @@ def _handle_data(context, data):
elif position.amount > 0 and \
price > cost_basis * (1 + context.PROFIT_TARGET):
profit = (price * position.amount) - (cost_basis * position.amount)
log.info('closing position, taking profit: {}'.format(profit))
order_target_percent(
asset=context.asset,
+34 -7
View File
@@ -1,3 +1,6 @@
import gzip
import tarfile
import requests
from datetime import timedelta, datetime
import os
@@ -8,7 +11,9 @@ import numpy as np
import pytz
from catalyst.data.bundles import from_bundle_ingest_dirname
from catalyst.data.bundles.core import download_without_progress
from catalyst.exchange.exchange_errors import ApiCandlesError
from catalyst.exchange.exchange_utils import get_exchange_bundles_folder
from catalyst.utils.deprecate import deprecated
from catalyst.utils.paths import data_path
@@ -29,21 +34,43 @@ def get_seconds_from_date(date):
return int((date - epoch).total_seconds())
def get_bcolz_chunk(exchange_name, data_frequency, symbol, period_a, period_b):
def get_bcolz_chunk(exchange_name, symbol, data_frequency, period):
"""
Download and extract a bcolz bundle.
:param exchange_name:
:param data_frequency:
:param symbol:
:param period_a:
Example: 2017
:param period_b:
Example: 10
:param data_frequency:
:param period:
:return:
Note:
Filename: bitfinex-daily-neo_eth-2017-10.tar.gz
:return:
"""
root = get_exchange_bundles_folder(exchange_name)
name = '{exchange}-{frequency}-{symbol}-{period}'.format(
exchange=exchange_name,
frequency=data_frequency,
symbol=symbol,
period=period
)
path = os.path.join(root, name)
if not os.path.isdir(path):
url = 'https://s3.amazonaws.com/enigmaco/catalyst-bundles/' \
'exchange-{exchange}/{name}.tar.gz'.format(
exchange=exchange_name,
name=name
)
bytes = download_without_progress(url)
with tarfile.open('r', fileobj=bytes) as tar:
tar.extractall(path)
return path
def get_history(exchange_name, data_frequency, symbol, start=None, end=None):
"""
History API provides OHLCV data for any of the supported exchanges up to yesterday.
+56 -105
View File
@@ -1,10 +1,10 @@
import calendar
import os
import time
from datetime import timedelta
from datetime import timedelta, datetime, date
import bcolz
import pandas as pd
from logbook import Logger, DEBUG, INFO
from logbook import Logger, INFO
from catalyst import get_calendar
from catalyst.data.minute_bars import BcolzMinuteOverlappingData, \
@@ -12,8 +12,9 @@ from catalyst.data.minute_bars import BcolzMinuteOverlappingData, \
from catalyst.data.us_equity_pricing import BcolzDailyBarWriter, \
BcolzDailyBarReader
from catalyst.exchange.bundle_utils import get_ffill_candles, get_start_dt, \
get_periods, range_in_bundle
from catalyst.exchange.exchange_utils import get_exchange_folder
get_periods, range_in_bundle, get_bcolz_chunk
from catalyst.exchange.exchange_utils import get_exchange_folder, \
get_exchange_bundles_folder
from catalyst.utils.cli import maybe_show_progress
from catalyst.utils.deprecate import deprecated
from catalyst.utils.paths import ensure_directory
@@ -35,6 +36,7 @@ class ExchangeBundle:
self.default_ohlc_ratio = 1000000
self._writers = dict()
self._readers = dict()
self.calendar = get_calendar('OPEN')
def get_assets(self, include_symbols, exclude_symbols):
# TODO: filter exclude symbols assets
@@ -117,8 +119,6 @@ class ExchangeBundle:
if key in self._writers:
return self._writers[key]
open_calendar = get_calendar('OPEN')
root = get_exchange_folder(self.exchange.name)
output_dir = BUNDLE_NAME_TEMPLATE.format(
root=root,
@@ -159,7 +159,7 @@ class ExchangeBundle:
else:
self._writers[key] = BcolzMinuteBarWriter(
rootdir=output_dir,
calendar=open_calendar,
calendar=self.calendar,
minutes_per_day=self.minutes_per_day,
start_session=start_dt,
end_session=end_dt,
@@ -175,7 +175,7 @@ class ExchangeBundle:
end_session = end_dt.floor('1d')
self._writers[key] = BcolzDailyBarWriter(
filename=output_dir,
calendar=open_calendar,
calendar=self.calendar,
start_session=start_dt,
end_session=end_session
)
@@ -318,18 +318,31 @@ class ExchangeBundle:
return data
def ingest_ctable(self, asset, data_frequency, path):
def download_bundle(self, name):
"""
:param name:
:return:
"""
def ingest_ctable(self, asset, data_frequency, period, writer):
start_time = time.time()
path = get_bcolz_chunk(
exchange_name=self.exchange.name,
symbol=asset.symbol,
data_frequency=data_frequency,
period=period
)
reader = BcolzMinuteBarReader(path)
start = reader.first_trading_day
end = reader.last_available_dt
open_calendar = get_calendar('OPEN')
periods = open_calendar.minutes_in_range(start, end)
periods = self.calendar.minutes_in_range(start, end)
sid = 284
sid = asset.sid
arrays = reader.load_raw_arrays(
fields=['open', 'high', 'low', 'close', 'volume'],
start_dt=start,
@@ -352,12 +365,10 @@ class ExchangeBundle:
data = []
if not df.empty:
df.sort_index(inplace=True)
df.sort_index(inplace=True, ascending=False)
data.append((sid, df))
writer = self.get_writer(start, end, data_frequency)
self._write(data, writer, data_frequency)
end_time = time.time()
@@ -371,7 +382,6 @@ class ExchangeBundle:
exclude_symbols=None, start=None, end=None,
show_progress=True, environ=os.environ):
"""
Ingest the bundle
:param data_frequency:
:param include_symbols:
@@ -385,113 +395,54 @@ class ExchangeBundle:
assets = self.get_assets(include_symbols, exclude_symbols)
start, end = self.get_adj_dates(start, end, assets)
symbols = list(map(lambda asset: asset.symbol, assets))
log.info(
'ingesting trading pairs {symbols} on exchange {exchange} '
'from {start} to {end}'.format(
symbols=symbols,
exchange=self.exchange.name,
start=start,
end=end
)
)
writer = self.get_writer(start, end, data_frequency)
reader = self.get_reader(data_frequency)
all_chunks = []
chunks = []
periods = []
for asset in assets:
try:
asset_start, asset_end = \
self.get_adj_dates(start, end, [asset])
asset_start, asset_end = self.get_adj_dates(start, end, [asset])
sessions = self.calendar.sessions_in_range(asset_start, asset_end)
except ValueError as e:
log.debug('asset outside of range {} {}'.format(asset, e))
continue
dt = sessions[0]
while dt <= sessions[-1]:
period = '{}-{}'.format(dt.year, dt.month)
asset_periods = get_periods(asset_start, asset_end, data_frequency)
if asset_periods > self.exchange.num_candles_limit:
bar_count = self.exchange.num_candles_limit
if period not in periods:
periods.append(period)
chunks = []
month_range = calendar.monthrange(dt.year, dt.month)
month_start = date(dt.year, dt.month, month_range[0])
month_end = date(dt.year, dt.month, month_range[1])
period_delta = timedelta(minutes=1) \
if data_frequency == 'minute' else \
timedelta(days=1)
chunk_start = asset_start.floor('1 min') - period_delta
while chunk_start < asset_end:
delta = timedelta(minutes=bar_count) \
if data_frequency == 'minute' else \
timedelta(days=bar_count)
chunk_end = chunk_start + delta \
if chunk_start + delta < asset_end else asset_end
chunk_periods = \
get_periods(chunk_start, chunk_end, data_frequency)
range_start = \
get_start_dt(chunk_end, chunk_periods, data_frequency)
if range_in_bundle(asset, range_start, chunk_end, reader):
log.debug(
'chunk already ingested {symbol} '
'{start} to {end}'.format(
symbol=asset.symbol,
start=range_start,
end=chunk_end
if not range_in_bundle(asset, month_start, month_end,
reader):
log.debug('adding period: {}'.format(period))
chunks.append(
dict(
asset=asset,
period_end=month_end,
period=period
)
)
chunk_start = chunk_end + period_delta
continue
dt += timedelta(days=1)
chunk = dict(
asset=asset,
end=chunk_end,
bar_count=chunk_periods
)
chunks.append(chunk)
chunk_start = chunk_end + period_delta
all_chunks += chunks
else:
if range_in_bundle(asset, asset_start, asset_end, reader):
log.debug(
'asset already ingested {symbol} '
'{start} to {end}'.format(
symbol=asset.symbol,
start=asset_start,
end=asset_end
)
)
continue
all_chunks += [
dict(asset=asset, end=asset_end, bar_count=asset_periods)
]
all_chunks.sort(key=lambda chunk: chunk['end'])
chunks.sort(key=lambda chunk: chunk['period_end'])
writer = self.get_writer(start, end, data_frequency)
with maybe_show_progress(
all_chunks,
chunks,
show_progress,
label='Fetching {exchange} {frequency} candles: '.format(
exchange=self.exchange.name,
frequency=data_frequency
)) as it:
previous_candle = dict()
for chunk in it:
self.ingest_chunk(
bar_count=chunk['bar_count'],
end_dt=chunk['end'],
data_frequency=data_frequency,
self.ingest_ctable(
asset=chunk['asset'],
writer=writer,
previous_candle=previous_candle,
data_frequency=data_frequency,
period=chunk['period'],
writer=writer
)
pass
+8
View File
@@ -162,6 +162,14 @@ def get_exchange_minute_writer_root(exchange_name, environ=None):
return minute_data_folder
def get_exchange_bundles_folder(exchange_name, environ=None):
exchange_folder = get_exchange_folder(exchange_name, environ)
temp_bundles = os.path.join(exchange_folder, 'temp_bundles')
ensure_directory(temp_bundles)
return temp_bundles
def perf_serial(obj):
"""JSON serializer for objects not serializable by default json code"""
+8
View File
@@ -2,6 +2,7 @@ from catalyst.exchange.bitfinex.bitfinex import Bitfinex
from catalyst.exchange.bittrex.bittrex import Bittrex
from catalyst.exchange.exchange_errors import ExchangeNotFoundError
from catalyst.exchange.exchange_utils import get_exchange_auth
from catalyst.exchange.poloniex.poloniex import Poloniex
def get_exchange(exchange_name):
@@ -20,5 +21,12 @@ def get_exchange(exchange_name):
base_currency=None,
portfolio=None
)
elif exchange_name == 'poloniex':
return Poloniex(
key=exchange_auth['key'],
secret=exchange_auth['secret'],
base_currency=None,
portfolio=None
)
else:
raise ExchangeNotFoundError(exchange_name=exchange_name)
+11 -16
View File
@@ -71,27 +71,22 @@ class ExchangeBundleTestCase:
pass
def test_merge_ctables(self):
exchange_name = 'bitfinex'
exchange_name = 'poloniex'
data_frequency = 'minute'
root = '/Users/fredfortier/.catalyst/data/exchanges/bitfinex/temp_bundles'
path = '00/02/000284.bcolz'
august = '{}/{}'.format(
root, 'poloniex-minute-btc_usdt-2017-8'
)
exchange = get_exchange(exchange_name)
asset = exchange.get_asset('btc_usd')
asset = exchange.get_asset('btc_usdt')
start = pd.to_datetime('2017-09-01', utc=True)
end = pd.to_datetime('2017-09-06', utc=True)
exchange_bundle = ExchangeBundle(exchange)
writer = exchange_bundle.get_writer(start, end, data_frequency)
exchange_bundle.ingest_ctable(
asset=asset,
data_frequency='minute',
path=august
data_frequency=data_frequency,
period='2017-9',
writer=writer
)
september = '{}/{}/{}'.format(
root, 'poloniex-minute-btc_usdt-2017-9', path
)
zseptember = bcolz.open(september, mode='a')
pass