Merge branch 'concurrent-exchanges' of github.com:enigmampc/catalyst into concurrent-exchanges

This commit is contained in:
Victor Grau Serrat
2017-10-16 22:35:57 -06:00
3 changed files with 71 additions and 21 deletions
+35 -11
View File
@@ -1,12 +1,9 @@
import calendar
import os
import shutil
import pytz
from datetime import timedelta, datetime
import pandas as pd
import numpy as np
from logbook import Logger, INFO
from catalyst import get_calendar
@@ -14,13 +11,11 @@ from catalyst.data.minute_bars import BcolzMinuteOverlappingData, \
BcolzMinuteBarWriter, BcolzMinuteBarReader, BcolzMinuteBarMetadata
from catalyst.data.us_equity_pricing import BcolzDailyBarWriter, \
BcolzDailyBarReader
from catalyst.exchange.bundle_utils import get_ffill_candles, get_start_dt, \
get_periods, range_in_bundle, get_bcolz_chunk
from catalyst.exchange.bundle_utils import get_ffill_candles, range_in_bundle, \
get_bcolz_chunk, get_delta
from catalyst.exchange.exchange_errors import EmptyValuesInBundleError
from catalyst.exchange.exchange_utils import get_exchange_folder, \
get_exchange_bundles_folder
from catalyst.exchange.exchange_utils import get_exchange_folder
from catalyst.utils.cli import maybe_show_progress
from catalyst.utils.deprecate import deprecated
from catalyst.utils.paths import ensure_directory
@@ -387,11 +382,40 @@ class ExchangeBundle:
if verify:
nan_rows = df[df.isnull().T.any().T].index
if len(nan_rows) > 0:
dates = []
previous_date = None
for row_date in nan_rows.values:
row_date = pd.to_datetime(row_date)
if previous_date is None:
dates.append(row_date)
else:
seq_date = previous_date + get_delta(1, data_frequency)
if row_date > seq_date:
dates.append(previous_date)
dates.append(row_date)
previous_date = row_date
dates.append(pd.to_datetime(nan_rows.values[-1]))
name = path.split('/')[-1]
log.warn(
'\n{name} with end minute {end_minute} has empty rows '
'in ranges: {dates}'.format(
name=name,
end_minute=asset.end_minute,
dates=dates
)
)
raise EmptyValuesInBundleError(
path=path,
start=nan_rows[0],
end=nan_rows[-1]
name=name,
end_minute=asset.end_minute,
dates=dates
)
data = []
+2 -2
View File
@@ -171,8 +171,8 @@ class BundleNotFoundError(ZiplineError):
class EmptyValuesInBundleError(ZiplineError):
msg = ('Found empty values in bundle {path} between '
'{start} and {end}.').strip()
msg = ('{name} with end minute {end_minute} has empty rows '
'in ranges: {dates}').strip()
class PricingDataBeforeTradingError(ZiplineError):
+34 -8
View File
@@ -2,6 +2,8 @@ from logging import Logger
import pandas as pd
from catalyst.data.minute_bars import BcolzMinuteBarReader
from catalyst.exchange.bundle_utils import get_bcolz_chunk
from catalyst.exchange.exchange_bundle import ExchangeBundle
from catalyst.exchange.init_utils import get_exchange
@@ -69,6 +71,33 @@ class ExchangeBundleTestCase:
pass
def test_merge_ctables(self):
exchange_name = 'bitfinex'
data_frequency = 'minute'
exchange = get_exchange(exchange_name)
# asset = exchange.get_asset('gno_btc')
#
# start = pd.to_datetime('2017-5-1', utc=True)
# end = pd.to_datetime('2017-5-31', utc=True)
asset = exchange.get_asset('neo_btc')
start = pd.to_datetime('2017-9-1', utc=True)
end = pd.to_datetime('2017-9-30', utc=True)
exchange_bundle = ExchangeBundle(exchange)
writer = exchange_bundle.get_writer(start, end, data_frequency)
exchange_bundle.ingest_ctable(
asset=asset,
data_frequency=data_frequency,
period='2017-9',
writer=writer,
verify=True
)
pass
def test_minute_bundle(self):
exchange_name = 'poloniex'
data_frequency = 'minute'
@@ -78,14 +107,11 @@ class ExchangeBundleTestCase:
start = pd.to_datetime('2017-5-1', utc=True)
end = pd.to_datetime('2017-5-31', utc=True)
exchange_bundle = ExchangeBundle(exchange)
writer = exchange_bundle.get_writer(start, end, data_frequency)
exchange_bundle.ingest_ctable(
asset=asset,
path = get_bcolz_chunk(
exchange_name=exchange_name,
symbol=asset.symbol,
data_frequency=data_frequency,
period='2017-5',
writer=writer,
verify=True
period='2017-5'
)
reader = BcolzMinuteBarReader(path)
pass