mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-30 11:59:37 +08:00
Tested ingestion of minute data with a single market
This commit is contained in:
@@ -0,0 +1,30 @@
|
||||
import pandas as pd
|
||||
|
||||
from catalyst import run_algorithm
|
||||
from catalyst.api import symbol
|
||||
|
||||
|
||||
def initialize(context):
|
||||
print('initializing')
|
||||
context.asset = symbol('btc_usdt')
|
||||
|
||||
|
||||
def handle_data(context, data):
|
||||
print('handling bar: {}'.format(data.current_dt))
|
||||
|
||||
price = data.current(context.asset, 'close')
|
||||
print('got price {price}'.format(price=price))
|
||||
|
||||
|
||||
run_algorithm(
|
||||
capital_base=250,
|
||||
start=pd.to_datetime('2017-1-1', utc=True),
|
||||
end=pd.to_datetime('2017-1-31', utc=True),
|
||||
data_frequency='minute',
|
||||
initialize=initialize,
|
||||
handle_data=handle_data,
|
||||
analyze=None,
|
||||
exchange_name='poloniex',
|
||||
algo_namespace='simple_loop',
|
||||
base_currency='btc'
|
||||
)
|
||||
@@ -236,7 +236,7 @@ def range_in_bundle(asset, start_dt, end_dt, reader):
|
||||
if np.isnan(end_close):
|
||||
has_data = False
|
||||
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
has_data = False
|
||||
|
||||
else:
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import calendar
|
||||
import os
|
||||
import time
|
||||
from datetime import timedelta, datetime, date
|
||||
import pytz
|
||||
from datetime import timedelta, datetime
|
||||
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from logbook import Logger, INFO
|
||||
|
||||
from catalyst import get_calendar
|
||||
@@ -13,6 +14,7 @@ from catalyst.data.us_equity_pricing import BcolzDailyBarWriter, \
|
||||
BcolzDailyBarReader
|
||||
from catalyst.exchange.bundle_utils import get_ffill_candles, get_start_dt, \
|
||||
get_periods, range_in_bundle, get_bcolz_chunk
|
||||
from catalyst.exchange.exchange_errors import EmptyValuesInBundleError
|
||||
from catalyst.exchange.exchange_utils import get_exchange_folder, \
|
||||
get_exchange_bundles_folder
|
||||
from catalyst.utils.cli import maybe_show_progress
|
||||
@@ -325,8 +327,18 @@ class ExchangeBundle:
|
||||
:return:
|
||||
"""
|
||||
|
||||
def ingest_ctable(self, asset, data_frequency, period, writer):
|
||||
start_time = time.time()
|
||||
def ingest_ctable(self, asset, data_frequency, period, writer,
|
||||
verify=False):
|
||||
"""
|
||||
Merge a ctable bundle chunk into the main bundle for the exchange.
|
||||
|
||||
:param asset: TradingPair
|
||||
:param data_frequency: str
|
||||
:param period: str
|
||||
:param writer:
|
||||
:param verify:
|
||||
:return:
|
||||
"""
|
||||
|
||||
path = get_bcolz_chunk(
|
||||
exchange_name=self.exchange.name,
|
||||
@@ -338,7 +350,10 @@ class ExchangeBundle:
|
||||
reader = BcolzMinuteBarReader(path)
|
||||
|
||||
start = reader.first_trading_day
|
||||
end = reader.last_available_dt
|
||||
|
||||
# TODO: temp workaround, remove when the bundles are fixed
|
||||
# end = reader.last_available_dt
|
||||
end = reader.last_available_dt - timedelta(days=1)
|
||||
|
||||
periods = self.calendar.minutes_in_range(start, end)
|
||||
|
||||
@@ -363,20 +378,23 @@ class ExchangeBundle:
|
||||
index=periods
|
||||
)
|
||||
|
||||
if verify:
|
||||
nan_rows = df[df.isnull().T.any().T].index
|
||||
if len(nan_rows) > 0:
|
||||
raise EmptyValuesInBundleError(
|
||||
path=path,
|
||||
start=nan_rows[0],
|
||||
end=nan_rows[-1]
|
||||
)
|
||||
|
||||
data = []
|
||||
if not df.empty:
|
||||
df.sort_index(inplace=True, ascending=False)
|
||||
df.sort_index(inplace=True)
|
||||
|
||||
data.append((sid, df))
|
||||
|
||||
self._write(data, writer, data_frequency)
|
||||
|
||||
end_time = time.time()
|
||||
delta_time = end_time - start_time
|
||||
|
||||
log.info('time elapsed {}'.format(delta_time))
|
||||
|
||||
pass
|
||||
return path
|
||||
|
||||
def ingest(self, data_frequency, include_symbols=None,
|
||||
exclude_symbols=None, start=None, end=None,
|
||||
@@ -411,11 +429,19 @@ class ExchangeBundle:
|
||||
periods.append(period)
|
||||
|
||||
month_range = calendar.monthrange(dt.year, dt.month)
|
||||
month_start = date(dt.year, dt.month, month_range[0])
|
||||
month_end = date(dt.year, dt.month, month_range[1])
|
||||
month_start = pd.to_datetime(
|
||||
datetime(dt.year, dt.month, 1, 0, 0, 0, 0),
|
||||
utc=True)
|
||||
|
||||
if not range_in_bundle(asset, month_start, month_end,
|
||||
reader):
|
||||
# TODO: workaround, remove when bundles are fixed
|
||||
month_end = pd.to_datetime(
|
||||
datetime(dt.year, dt.month, month_range[1] - 1,
|
||||
23, 59, 0, 0),
|
||||
utc=True)
|
||||
has_data = \
|
||||
range_in_bundle(asset, month_start, month_end, reader)
|
||||
|
||||
if not has_data:
|
||||
log.debug('adding period: {}'.format(period))
|
||||
chunks.append(
|
||||
dict(
|
||||
@@ -445,4 +471,3 @@ class ExchangeBundle:
|
||||
period=chunk['period'],
|
||||
writer=writer
|
||||
)
|
||||
pass
|
||||
|
||||
@@ -1,16 +1,18 @@
|
||||
import sys, traceback
|
||||
from catalyst.errors import ZiplineError
|
||||
|
||||
|
||||
def silent_except_hook(exctype, excvalue, exctraceback):
|
||||
if exctype in [ PricingDataBeforeTradingError, PricingDataNotLoadedError,
|
||||
SymbolNotFoundOnExchange, ]:
|
||||
if exctype in [PricingDataBeforeTradingError, PricingDataNotLoadedError,
|
||||
SymbolNotFoundOnExchange, ]:
|
||||
fn = traceback.extract_tb(exctraceback)[-1][0]
|
||||
ln = traceback.extract_tb(exctraceback)[-1][1]
|
||||
print "Error traceback: {1} (line {2})\n" \
|
||||
"{0.__name__}: {3}".format(exctype, fn, ln, excvalue)
|
||||
"{0.__name__}: {3}".format(exctype, fn, ln, excvalue)
|
||||
else:
|
||||
sys.__excepthook__(exctype, excvalue, exctraceback)
|
||||
|
||||
|
||||
sys.excepthook = silent_except_hook
|
||||
|
||||
|
||||
@@ -168,6 +170,11 @@ class BundleNotFoundError(ZiplineError):
|
||||
'See catalyst documentation for details.').strip()
|
||||
|
||||
|
||||
class EmptyValuesInBundleError(ZiplineError):
|
||||
msg = ('Found empty values in bundle {path} between '
|
||||
'{start} and {end}.').strip()
|
||||
|
||||
|
||||
class PricingDataBeforeTradingError(ZiplineError):
|
||||
msg = ('Pricing data for trading pairs {symbols} on exchange {exchange} '
|
||||
'starts on {first_trading_day}, but you are either trying to trade or '
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
from datetime import timedelta, time
|
||||
from logging import Logger
|
||||
|
||||
import bcolz
|
||||
from toolz.itertoolz import join as joinz
|
||||
import pandas as pd
|
||||
|
||||
from catalyst.exchange.exchange_bundle import ExchangeBundle
|
||||
@@ -13,18 +10,18 @@ log = Logger('test_exchange_bundle')
|
||||
|
||||
class ExchangeBundleTestCase:
|
||||
def test_ingest_minute(self):
|
||||
exchange_name = 'bitfinex'
|
||||
exchange_name = 'poloniex'
|
||||
|
||||
# start = pd.to_datetime('2017-09-01', utc=True)
|
||||
start = pd.to_datetime('2017-10-01', utc=True)
|
||||
end = pd.to_datetime('2017-10-06', utc=True)
|
||||
start = pd.to_datetime('2017-1-1', utc=True)
|
||||
end = pd.to_datetime('2017-6-30', utc=True)
|
||||
|
||||
exchange_bundle = ExchangeBundle(get_exchange(exchange_name))
|
||||
|
||||
log.info('ingesting exchange bundle {}'.format(exchange_name))
|
||||
exchange_bundle.ingest(
|
||||
data_frequency='minute',
|
||||
include_symbols='bcc_btc',
|
||||
include_symbols='btc_usdt',
|
||||
exclude_symbols=None,
|
||||
start=start,
|
||||
end=end,
|
||||
@@ -77,8 +74,8 @@ class ExchangeBundleTestCase:
|
||||
exchange = get_exchange(exchange_name)
|
||||
asset = exchange.get_asset('btc_usdt')
|
||||
|
||||
start = pd.to_datetime('2017-09-01', utc=True)
|
||||
end = pd.to_datetime('2017-09-06', utc=True)
|
||||
start = pd.to_datetime('2017-5-1', utc=True)
|
||||
end = pd.to_datetime('2017-5-31', utc=True)
|
||||
|
||||
exchange_bundle = ExchangeBundle(exchange)
|
||||
|
||||
@@ -86,7 +83,8 @@ class ExchangeBundleTestCase:
|
||||
exchange_bundle.ingest_ctable(
|
||||
asset=asset,
|
||||
data_frequency=data_frequency,
|
||||
period='2017-9',
|
||||
writer=writer
|
||||
period='2017-5',
|
||||
writer=writer,
|
||||
verify=True
|
||||
)
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user