Trying to fix an issue with periodical bars

This commit is contained in:
fredfortier
2017-09-20 18:00:08 -04:00
parent 3b655d466e
commit 4e2d092123
6 changed files with 263 additions and 277 deletions
+15 -5
View File
@@ -232,7 +232,8 @@ class Bitfinex(Exchange):
# TODO: fetch account data and keep in cache
return None
def get_candles(self, data_frequency, assets, bar_count=None, end_dt=None):
def get_candles(self, data_frequency, assets, bar_count=None,
start_dt=None, end_dt=None):
"""
Retrieve OHLVC candles from Bitfinex
@@ -289,11 +290,18 @@ class Bitfinex(Exchange):
is_list = True
url += '/hist?limit={}'.format(int(bar_count))
if end_dt is not None:
def get_ms(date):
epoch = datetime.datetime.utcfromtimestamp(0)
epoch = epoch.replace(tzinfo=pytz.UTC)
end_ms = (end_dt - epoch).total_seconds() * 1000.0
return (date - epoch).total_seconds() * 1000.0
if start_dt is not None:
start_ms = get_ms(start_dt)
url += '&start={0:f}'.format(start_ms)
if end_dt is not None:
end_ms = get_ms(end_dt)
url += '&end={0:f}'.format(end_ms)
else:
@@ -315,6 +323,9 @@ class Bitfinex(Exchange):
candles = response.json()
def ohlc_from_candle(candle):
last_traded = pd.Timestamp.utcfromtimestamp(
candle[0] / 1000.0)
last_traded = last_traded.replace(tzinfo=pytz.UTC)
ohlc = dict(
open=np.float64(candle[1]),
high=np.float64(candle[3]),
@@ -322,8 +333,7 @@ class Bitfinex(Exchange):
close=np.float64(candle[2]),
volume=np.float64(candle[5]),
price=np.float64(candle[2]),
last_traded=pd.Timestamp.utcfromtimestamp(
candle[0] / 1000.0)
last_traded=last_traded
)
return ohlc
+18 -9
View File
@@ -331,20 +331,29 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase):
else:
raise ValueError('Unsupported frequency')
values = []
for asset in assets:
if isinstance(assets, TradingPair):
try:
value = reader.get_value(
sid=asset.sid,
sid=assets.sid,
dt=dt,
field=field
)
values.append(value)
return value
except Exception as e:
log.warn('minute data not found: {}'.format(e))
values.append(None)
if len(assets) == 1:
return values[0]
return None
else:
return values
values = []
for asset in assets:
try:
value = reader.get_value(
sid=asset.sid,
dt=dt,
field=field
)
values.append(value)
except Exception as e:
log.warn('minute data not found: {}'.format(e))
values.append(None)
return values
+80 -72
View File
@@ -49,7 +49,7 @@ from catalyst.utils.input_validation import error_keywords, ensure_upper_case, \
expect_types
from catalyst.utils.preprocess import preprocess
log = logbook.Logger("ExchangeTradingAlgorithm")
log = logbook.Logger('exchange_algorithm')
class ExchangeAlgorithmExecutor(AlgorithmSimulator):
@@ -59,6 +59,8 @@ class ExchangeAlgorithmExecutor(AlgorithmSimulator):
class ExchangeTradingAlgorithmBase(TradingAlgorithm):
def __init__(self, *args, **kwargs):
self.exchanges = kwargs.pop('exchanges', None)
super(ExchangeTradingAlgorithmBase, self).__init__(*args, **kwargs)
@api_method
@@ -106,10 +108,83 @@ class ExchangeTradingAlgorithmBase(TradingAlgorithm):
as_of_date=_lookup_date
)
def prepare_period_stats(self, start_dt, end_dt):
"""
Creates a dictionary representing the state of the tracker.
class ExchangeTradingAlgorithm(ExchangeTradingAlgorithmBase):
I rewrote this in an attempt to better control the stats.
I don't want things to happen magically through complex logic
pertaining to backtesting.
"""
tracker = self.perf_tracker
period = tracker.todays_performance
pos_stats = period.position_tracker.stats()
period_stats = calc_period_stats(pos_stats, period.ending_cash)
stats = dict(
period_start=tracker.period_start,
period_end=tracker.period_end,
capital_base=tracker.capital_base,
progress=tracker.progress,
ending_value=period.ending_value,
ending_exposure=period.ending_exposure,
capital_used=period.cash_flow,
starting_value=period.starting_value,
starting_exposure=period.starting_exposure,
starting_cash=period.starting_cash,
ending_cash=period.ending_cash,
portfolio_value=period.ending_cash + period.ending_value,
pnl=period.pnl,
returns=period.returns,
period_open=period.period_open,
period_close=period.period_close,
gross_leverage=period_stats.gross_leverage,
net_leverage=period_stats.net_leverage,
short_exposure=pos_stats.short_exposure,
long_exposure=pos_stats.long_exposure,
short_value=pos_stats.short_value,
long_value=pos_stats.long_value,
longs_count=pos_stats.longs_count,
shorts_count=pos_stats.shorts_count,
)
# Merging cumulative risk
stats.update(tracker.cumulative_risk_metrics.to_dict())
# Merging latest recorded variables
stats.update(self.recorded_vars)
stats['positions'] = period.position_tracker.get_positions_list()
# we want the key to be absent, not just empty
# Only include transactions for given dt
stats['transactions'] = dict()
for date in period.processed_transactions:
if start_dt <= date < end_dt:
stats['transactions'][date] = \
period.processed_transactions[date]
stats['orders'] = dict()
for date in period.orders_by_modified:
if start_dt <= date < end_dt:
stats['orders'][date] = \
period.orders_by_modified[date]
return stats
class ExchangeTradingAlgorithmBacktest(ExchangeTradingAlgorithmBase):
def __init__(self, *args, **kwargs):
super(ExchangeTradingAlgorithmBacktest, self).__init__(*args, **kwargs)
log.info('initialized trading algorithm in backtest mode')
class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
def __init__(self, *args, **kwargs):
self.exchanges = kwargs.pop('exchanges', None)
self.algo_namespace = kwargs.pop('algo_namespace', None)
self.live_graph = kwargs.pop('live_graph', None)
@@ -134,13 +209,13 @@ class ExchangeTradingAlgorithm(ExchangeTradingAlgorithmBase):
self.stats_minutes = 5
super(ExchangeTradingAlgorithm, self).__init__(*args, **kwargs)
super(ExchangeTradingAlgorithmLive, self).__init__(*args, **kwargs)
# TODO: fix precision before re-enabling
# self._create_minute_writer()
signal.signal(signal.SIGINT, self.signal_handler)
log.info('exchange trading algorithm successfully initialized')
log.info('initialized trading algorithm in live mode')
def _create_minute_writer(self):
root = get_exchange_minute_writer_root(self.exchange.name)
@@ -360,73 +435,6 @@ class ExchangeTradingAlgorithm(ExchangeTradingAlgorithmBase):
save_algo_df(self.algo_namespace, 'exposure_stats',
self.exposure_stats)
def prepare_period_stats(self, start_dt, end_dt):
"""
Creates a dictionary representing the state of the tracker.
I rewrote this in an attempt to better control the stats.
I don't want things to happen magically through complex logic
pertaining to backtesting.
"""
tracker = self.perf_tracker
period = tracker.todays_performance
pos_stats = period.position_tracker.stats()
period_stats = calc_period_stats(pos_stats, period.ending_cash)
stats = dict(
period_start=tracker.period_start,
period_end=tracker.period_end,
capital_base=tracker.capital_base,
progress=tracker.progress,
ending_value=period.ending_value,
ending_exposure=period.ending_exposure,
capital_used=period.cash_flow,
starting_value=period.starting_value,
starting_exposure=period.starting_exposure,
starting_cash=period.starting_cash,
ending_cash=period.ending_cash,
portfolio_value=period.ending_cash + period.ending_value,
pnl=period.pnl,
returns=period.returns,
period_open=period.period_open,
period_close=period.period_close,
gross_leverage=period_stats.gross_leverage,
net_leverage=period_stats.net_leverage,
short_exposure=pos_stats.short_exposure,
long_exposure=pos_stats.long_exposure,
short_value=pos_stats.short_value,
long_value=pos_stats.long_value,
longs_count=pos_stats.longs_count,
shorts_count=pos_stats.shorts_count,
)
# Merging cumulative risk
stats.update(tracker.cumulative_risk_metrics.to_dict())
# Merging latest recorded variables
stats.update(self.recorded_vars)
stats['positions'] = period.position_tracker.get_positions_list()
# we want the key to be absent, not just empty
# Only include transactions for given dt
stats['transactions'] = dict()
for date in period.processed_transactions:
if start_dt <= date < end_dt:
stats['transactions'][date] = \
period.processed_transactions[date]
stats['orders'] = dict()
for date in period.orders_by_modified:
if start_dt <= date < end_dt:
stats['orders'][date] = \
period.orders_by_modified[date]
return stats
def handle_data(self, data):
if not self.is_running:
return
+65 -36
View File
@@ -1,17 +1,12 @@
from datetime import timedelta
from time import sleep
import os
import pandas as pd
from catalyst.data.bundles.base_pricing import BaseCryptoPricingBundle
from catalyst import get_calendar
import numpy as np
from logbook import Logger, INFO
from catalyst import get_calendar
from catalyst.data.five_minute_bars import BcolzFiveMinuteOverlappingData
from catalyst.data.minute_bars import BcolzMinuteOverlappingData, \
BcolzMinuteBarReader
from catalyst.data.minute_bars import BcolzMinuteOverlappingData
from catalyst.exchange.bitfinex.bitfinex import Bitfinex
from catalyst.exchange.bittrex.bittrex import Bittrex
from catalyst.exchange.exchange_errors import ExchangeNotFoundError
@@ -27,10 +22,12 @@ log = Logger('exchange_bundle')
def fetch_candles_chunk(exchange, assets, data_frequency, end_dt, bar_count):
calc_start_dt = end_dt - timedelta(minutes=bar_count)
candles = exchange.get_candles(
data_frequency=data_frequency,
assets=assets,
bar_count=bar_count,
start_dt=calc_start_dt,
end_dt=end_dt
)
@@ -39,10 +36,34 @@ def fetch_candles_chunk(exchange, assets, data_frequency, end_dt, bar_count):
for asset in assets:
asset_candles = candles[asset]
candle_start_dt = None
candle_end_dt = None
for candle in asset_candles:
last_traded = candle['last_traded']
if candle_start_dt is None or candle_start_dt > last_traded:
candle_start_dt = last_traded
if candle_end_dt is None or candle_end_dt < last_traded:
candle_end_dt = last_traded
if candle_end_dt < end_dt:
asset_candles.append(
dict(
open=None,
high=None,
close=None,
low=None,
volume=None,
last_traded=end_dt
)
)
asset_df = pd.DataFrame(asset_candles)
if not asset_df.empty:
asset_df.set_index('last_traded', inplace=True, drop=True)
asset_df.sort_index(inplace=True)
asset_df = asset_df.resample('1T').ffill()
series[asset] = asset_df
@@ -77,12 +98,13 @@ def process_bar_data(exchange, assets, writer, data_frequency,
bar_count = exchange.num_candles_limit
chunks = []
last_chunk_date = end
last_chunk_date = end.floor('1 min')
while last_chunk_date > start + timedelta(minutes=bar_count):
# TODO: account for the partial last bar
chunk = dict(end=last_chunk_date, bar_count=bar_count)
chunks.append(chunk)
# TODO: base on frequency
last_chunk_date = \
last_chunk_date - timedelta(minutes=(bar_count + 1))
@@ -119,20 +141,29 @@ def process_bar_data(exchange, assets, writer, data_frequency,
)
continue
num_candles = 0
data = []
for asset in assets_candles_dict:
df = assets_candles_dict[asset]
sid = asset.sid
num_candles += len(df.values)
data.append((sid, df))
try:
log.debug(
'writing chunk {start} to {end}'.format(
start=chunk['end'] - timedelta(
minutes=chunk['bar_count']),
log.info(
'writing {num_candles} candles from {start} to {end}'.format(
num_candles=num_candles,
start=chunk['end'] - \
timedelta(minutes=chunk['bar_count']),
end=chunk['end']
)
)
for pair in data:
log.info('data for sid {}\n{}\n{}'.format(
pair[0], pair[1].head(2), pair[1].tail(2)))
writer.write(
data=data,
show_progress=False,
@@ -258,27 +289,27 @@ def exchange_bundle(exchange_name, symbols=None, start=None, end=None,
if start >= end:
raise ValueError('start date cannot be after end date')
if daily_bar_writer is not None:
process_bar_data(
exchange=exchange,
assets=assets,
writer=daily_bar_writer,
data_frequency='daily',
show_progress=show_progress,
start=start,
end=end
)
if five_minute_bar_writer is not None:
process_bar_data(
exchange=exchange,
assets=assets,
writer=five_minute_bar_writer,
data_frequency='5-minute',
show_progress=show_progress,
start=start,
end=end
)
# if daily_bar_writer is not None:
# process_bar_data(
# exchange=exchange,
# assets=assets,
# writer=daily_bar_writer,
# data_frequency='daily',
# show_progress=show_progress,
# start=start,
# end=end
# )
#
# if five_minute_bar_writer is not None:
# process_bar_data(
# exchange=exchange,
# assets=assets,
# writer=five_minute_bar_writer,
# data_frequency='5-minute',
# show_progress=show_progress,
# start=start,
# end=end
# )
if minute_bar_writer is not None:
process_bar_data(
@@ -292,5 +323,3 @@ def exchange_bundle(exchange_name, symbols=None, start=None, end=None,
)
return ingest
+83 -153
View File
@@ -1,14 +1,12 @@
import os
import re
from runpy import run_path
import sys
import warnings
from time import sleep
from datetime import timedelta
import pandas as pd
from runpy import run_path
from time import sleep
import click
import pandas as pd
from catalyst.exchange.bittrex.bittrex import Bittrex
@@ -23,22 +21,15 @@ except:
from toolz import valfilter, concatv
from functools import partial
from catalyst.algorithm import TradingAlgorithm
from catalyst.data.bundles.core import load
from catalyst.data.data_portal import DataPortal
from catalyst.data.loader import load_crypto_market_data
from catalyst.finance.trading import TradingEnvironment
from catalyst.pipeline.data import USEquityPricing, CryptoPricing
from catalyst.pipeline.loaders import (
USEquityPricingLoader,
CryptoPricingLoader,
)
from catalyst.utils.calendars import get_calendar
from catalyst.utils.factory import create_simulation_parameters
import catalyst.utils.paths as pth
from catalyst.exchange.exchange_algorithm import ExchangeTradingAlgorithm
from catalyst.exchange.data_portal_exchange import DataPortalExchangeLive
from catalyst.exchange.exchange_algorithm import ExchangeTradingAlgorithmLive, \
ExchangeTradingAlgorithmBacktest
from catalyst.exchange.data_portal_exchange import DataPortalExchangeLive, \
DataPortalExchangeBacktest
from catalyst.exchange.bitfinex.bitfinex import Bitfinex
from catalyst.exchange.asset_finder_exchange import AssetFinderExchange
from catalyst.exchange.exchange_portfolio import ExchangePortfolio
@@ -148,49 +139,46 @@ def _run(handle_data,
mode = 'live' if live else 'backtest'
log.info('running algo in {mode} mode'.format(mode=mode))
if live and exchange is not None:
exchange_name = exchange
exchange_name = exchange
if exchange_name is None:
raise ValueError('Please specify at least one exchange.')
start = pd.Timestamp.utcnow()
# TODO: fix the end data.
end = start + timedelta(hours=8760)
exchange_list = [x.strip().lower() for x in exchange.split(',')]
exchange_list = [x.strip().lower() for x in exchange.split(',')]
exchanges = dict()
for exchange_name in exchange_list:
exchanges = dict()
for exchange_name in exchange_list:
# Looking for the portfolio from the cache first
portfolio = get_algo_object(
algo_name=algo_namespace,
key='portfolio_{}'.format(exchange_name),
environ=environ
)
# Looking for the portfolio from the cache first
portfolio = get_algo_object(
algo_name=algo_namespace,
key='portfolio_{}'.format(exchange_name),
environ=environ
if portfolio is None:
portfolio = ExchangePortfolio(
start_date=pd.Timestamp.utcnow()
)
if portfolio is None:
portfolio = ExchangePortfolio(
start_date=pd.Timestamp.utcnow()
)
# This corresponds to the json file containing api token info
exchange_auth = get_exchange_auth(exchange_name)
if exchange_name == 'bitfinex':
exchanges[exchange_name] = Bitfinex(
key=exchange_auth['key'],
secret=exchange_auth['secret'],
base_currency=base_currency,
portfolio=portfolio
)
elif exchange_name == 'bittrex':
exchanges[exchange_name] = Bittrex(
key=exchange_auth['key'],
secret=exchange_auth['secret'],
base_currency=base_currency,
portfolio=portfolio
)
else:
raise ExchangeNotFoundError(exchange_name=exchange_name)
# This corresponds to the json file containing api token info
exchange_auth = get_exchange_auth(exchange_name)
if exchange_name == 'bitfinex':
exchanges[exchange_name] = Bitfinex(
key=exchange_auth['key'],
secret=exchange_auth['secret'],
base_currency=base_currency,
portfolio=portfolio
)
elif exchange_name == 'bittrex':
exchanges[exchange_name] = Bittrex(
key=exchange_auth['key'],
secret=exchange_auth['secret'],
base_currency=base_currency,
portfolio=portfolio
)
else:
raise ExchangeNotFoundError(exchange_name=exchange_name)
open_calendar = get_calendar('OPEN')
sim_params = create_simulation_parameters(
@@ -201,13 +189,19 @@ def _run(handle_data,
emission_rate=data_frequency,
)
if live and exchange is not None:
env = TradingEnvironment(
environ=environ,
exchange_tz='UTC',
asset_db_path=None
)
env.asset_finder = AssetFinderExchange()
env = TradingEnvironment(
environ=environ,
exchange_tz='UTC',
asset_db_path=None # We don't need an asset db, we have exchanges
)
env.asset_finder = AssetFinderExchange()
choose_loader = None # TODO: use the DataPortal for in the algorithm class for this
if live:
start = pd.Timestamp.utcnow()
# TODO: fix the end data.
end = start + timedelta(hours=8760)
data = DataPortalExchangeLive(
exchanges=exchanges,
@@ -215,7 +209,6 @@ def _run(handle_data,
trading_calendar=open_calendar,
first_trading_day=pd.to_datetime('today', utc=True)
)
choose_loader = None
def fetch_capital_base(exchange, attempt_index=0):
"""
@@ -264,102 +257,34 @@ def _run(handle_data,
data_frequency='minute'
)
elif bundle is not None:
bundles = bundle.split(',')
def get_trading_env_and_data(bundles):
env = data = None
b = 'poloniex'
if len(bundles) == 0:
return env, data
elif len(bundles) == 1:
b = bundles[0]
bundle_data = load(
b,
environ,
bundle_timestamp,
)
prefix, connstr = re.split(
r'sqlite:///',
str(bundle_data.asset_finder.engine.url),
maxsplit=1,
)
if prefix:
raise ValueError(
"invalid url %r, must begin with 'sqlite:///'" %
str(bundle_data.asset_finder.engine.url),
)
env = TradingEnvironment(
load=partial(load_crypto_market_data, bundle=b,
bundle_data=bundle_data, environ=environ),
bm_symbol='USDT_BTC',
trading_calendar=open_calendar,
asset_db_path=connstr,
environ=environ,
)
first_trading_day = bundle_data.minute_bar_reader.first_trading_day
data = DataPortal(
env.asset_finder,
open_calendar,
first_trading_day=first_trading_day,
minute_reader=bundle_data.minute_bar_reader,
five_minute_reader=bundle_data.five_minute_bar_reader,
daily_reader=bundle_data.daily_bar_reader,
adjustment_reader=bundle_data.adjustment_reader,
)
return env, data
def get_loader_for_bundle(b):
bundle_data = load(
b,
environ,
bundle_timestamp,
)
if b == 'poloniex':
return CryptoPricingLoader(
bundle_data,
data_frequency,
CryptoPricing,
)
elif b == 'quandl':
return USEquityPricingLoader(
bundle_data,
data_frequency,
USEquityPricing,
)
raise ValueError(
"No PipelineLoader registered for bundle %s." % b
)
loaders = [get_loader_for_bundle(b) for b in bundles]
env, data = get_trading_env_and_data(bundles)
def choose_loader(column):
for loader in loaders:
if column in loader.columns:
return loader
raise ValueError(
"No PipelineLoader registered for column %s." % column
)
algorithm_class = partial(
ExchangeTradingAlgorithmLive,
exchanges=exchanges,
algo_namespace=algo_namespace,
live_graph=live_graph
)
else:
env = TradingEnvironment(environ=environ)
choose_loader = None
# Removed the existing Poloniex fork to keep things simple
# We can add back the complexity if required.
TradingAlgorithmClass = (
partial(ExchangeTradingAlgorithm, exchanges=exchanges,
algo_namespace=algo_namespace, live_graph=live_graph)
if live and exchanges else TradingAlgorithm) # TODO: backtest trading algo class
# I don't think that we should have arbitrary price data bundles
# Instead, we should center this data around exchanges.
# We still need to support bundles for other misc data, but we
# can handle this later.
perf = TradingAlgorithmClass(
data = DataPortalExchangeBacktest(
exchanges=exchanges,
asset_finder=env.asset_finder,
trading_calendar=open_calendar,
first_trading_day=start,
)
algorithm_class = partial(
ExchangeTradingAlgorithmBacktest,
exchanges=exchanges
)
perf = algorithm_class(
namespace=namespace,
env=env,
get_pipeline_loader=choose_loader,
@@ -530,6 +455,11 @@ def run_algorithm(initialize,
"""
load_extensions(default_extension, extensions, strict_extensions, environ)
# I'm not sure that we need this since the modified DataPortal
# does not require extensions to be explicitly loaded.
# This will be useful for arbitrary non-pricing bundles but we may
# need to modify the logic.
if not live:
non_none_data = valfilter(bool, {
'data': data is not None,
+2 -2
View File
@@ -50,7 +50,7 @@ class ExchangeDataPortalTestCase:
exchanges=dict(bitfinex=self.bitfinex),
asset_finder=asset_finder,
trading_calendar=open_calendar,
first_trading_day=pd.to_datetime('today', utc=True)
first_trading_day=pd.to_datetime('2017-09-10', utc=True)
)
def test_get_history_window_live(self):
@@ -90,7 +90,7 @@ class ExchangeDataPortalTestCase:
asset_finder.lookup_symbol('neo_btc', self.bitfinex),
]
date = pd.Timestamp.utcnow() - timedelta(hours=8)
date = pd.to_datetime('2017-09-10 9:00', utc=True)
value = self.data_portal_backtest.get_spot_value(
assets, 'close', date, 'minute')
pass