Bug fixes and trying to cache minute data

This commit is contained in:
Frederic Fortier
2017-08-22 14:59:47 -04:00
parent a5ca8d0f15
commit 8b81e66d57
7 changed files with 163 additions and 57 deletions
+22 -12
View File
@@ -18,10 +18,10 @@ log = Logger(algo_namespace)
def initialize(context):
log.info('initializing algo')
context.ASSET_NAME = 'IOT_USD'
context.ASSET_NAME = 'XRP_USD'
context.asset = symbol(context.ASSET_NAME)
context.TARGET_POSITIONS = 200
context.TARGET_POSITIONS = 7
context.PROFIT_TARGET = 0.1
context.SLIPPAGE_ALLOWED = 0.02
@@ -44,12 +44,12 @@ def _handle_data(context, data):
log.info('got rsi: {}'.format(rsi))
# Buying more when RSI is low, this should lower our cost basis
if rsi <= 40:
buy_increment = 2
elif rsi <= 30:
buy_increment = 5
else:
if rsi <= 30:
buy_increment = 1
elif rsi <= 40:
buy_increment = 0.5
else:
buy_increment = 0.1
cash = context.portfolio.cash
log.info('base currency available: {cash}'.format(cash=cash))
@@ -76,10 +76,6 @@ def _handle_data(context, data):
if context.asset in context.portfolio.positions:
position = context.portfolio.positions[context.asset]
if position.amount >= context.TARGET_POSITIONS:
log.info('reached positions target: {}'.format(position.amount))
return
cost_basis = position.cost_basis
log.info(
'found {amount} positions with cost basis {cost_basis}'.format(
@@ -87,6 +83,20 @@ def _handle_data(context, data):
cost_basis=cost_basis
)
)
# if position.amount > 0:
# order_target_percent(
# asset=context.asset,
# target=0,
# limit_price=price * (1 - context.SLIPPAGE_ALLOWED),
# )
# log.debug('liquidated the position')
# return
if position.amount >= context.TARGET_POSITIONS:
log.info('reached positions target: {}'.format(position.amount))
return
if price < cost_basis:
is_buy = True
elif price > cost_basis * (1 + context.PROFIT_TARGET) or rsi > 70:
@@ -120,7 +130,7 @@ def handle_data(context, data):
log.info('handling bar {}'.format(data.current_dt))
try:
_handle_data(context, data)
except ZiplineError as e:
except Exception as e:
log.warn('aborting the bar on error {}'.format(e))
context.errors.append(e)
+36 -1
View File
@@ -10,6 +10,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import signal
import sys
from datetime import timedelta
@@ -20,6 +21,8 @@ import pandas as pd
import catalyst.protocol as zp
from catalyst.algorithm import TradingAlgorithm
from catalyst.data.minute_bars import BcolzMinuteBarWriter, \
BcolzMinuteBarReader
from catalyst.errors import OrderInBeforeTradingStart
from catalyst.exchange.exchange_clock import ExchangeClock
from catalyst.exchange.exchange_errors import (
@@ -27,6 +30,7 @@ from catalyst.exchange.exchange_errors import (
ExchangePortfolioDataError,
ExchangeTransactionError
)
from catalyst.exchange.exchange_utils import get_exchange_minute_writer_root
from catalyst.exchange.exchange_utils import save_algo_object, get_algo_object
from catalyst.finance.performance.period import calc_period_stats
from catalyst.gens.tradesimulation import AlgorithmSimulator
@@ -58,9 +62,32 @@ class ExchangeTradingAlgorithm(TradingAlgorithm):
self.retry_delay = 5
super(self.__class__, self).__init__(*args, **kwargs)
self._create_minute_writer()
signal.signal(signal.SIGINT, self.signal_handler)
log.info('exchange trading algorithm successfully initialized')
def _create_minute_writer(self):
root = get_exchange_minute_writer_root(self.exchange.name)
filename = os.path.join(root, 'metadata.json')
if os.path.isfile(filename):
writer = BcolzMinuteBarWriter.open(
root, self.sim_params.end_session)
else:
writer = BcolzMinuteBarWriter(
rootdir=root,
calendar=self.trading_calendar,
minutes_per_day=1440,
start_session=self.sim_params.start_session,
end_session=self.sim_params.end_session,
write_metadata=True
)
self.exchange.minute_writer = writer
self.exchange.minute_reader = BcolzMinuteBarReader(root)
def signal_handler(self, signal, frame):
self.is_running = False
@@ -103,7 +130,6 @@ class ExchangeTradingAlgorithm(TradingAlgorithm):
execution_closes = \
self.trading_calendar.execution_time_from_close(market_closes)
signal.signal(signal.SIGINT, self.signal_handler)
return ExchangeClock(
self.sim_params.sessions,
@@ -343,6 +369,15 @@ class ExchangeTradingAlgorithm(TradingAlgorithm):
self.perf_tracker.process_order(order)
return order
def round_order(self, amount):
"""
We need fractions with cryptocurrencies
:param amount:
:return:
"""
return amount
@api_method
def batch_market_order(self, share_counts):
raise NotImplementedError()
+15 -9
View File
@@ -1,4 +1,5 @@
import base64
import numpy as np
import hashlib
import hmac
import json
@@ -18,7 +19,6 @@ from catalyst.exchange.exchange_errors import (
ExchangeRequestError,
InvalidHistoryFrequencyError
)
from catalyst.exchange.exchange_utils import get_exchange_symbols
from catalyst.finance.execution import (MarketOrder,
LimitOrder,
StopOrder,
@@ -44,9 +44,11 @@ class Bitfinex(Exchange):
self.id = 'b'
self.name = 'bitfinex'
self.assets = {}
self.load_assets(get_exchange_symbols(self.name))
self.load_assets()
self.base_currency = base_currency
self._portfolio = portfolio
self.minute_writer = None
self.minute_reader = None
def _request(self, operation, data, version='v1'):
payload_object = {
@@ -288,6 +290,8 @@ class Bitfinex(Exchange):
'1m', '5m', '15m', '30m', '1h', '3h', '6h', '12h', '1D', '7D', '14D',
'1M'
"""
# TODO: use BcolzMinuteBarReader to read from cache
freq_match = re.match(r'([0-9].*)(m|h|d)', data_frequency, re.M | re.I)
if freq_match:
number = int(freq_match.group(1))
@@ -347,16 +351,18 @@ class Bitfinex(Exchange):
candles = response.json()
def ohlc_from_candle(candle):
return dict(
open=candle[1],
high=candle[3],
low=candle[4],
close=candle[2],
volume=candle[5],
price=candle[2],
ohlc = dict(
open=np.float64(candle[1]),
high=np.float64(candle[3]),
low=np.float64(candle[4]),
close=np.float64(candle[2]),
volume=np.float64(candle[5]),
price=np.float64(candle[2]),
last_traded=pd.Timestamp.utcfromtimestamp(
candle[0] / 1000.0),
minute_dt=pd.Timestamp.utcnow().floor('1 min')
)
return ohlc
if is_list:
ohlc_bars = []
+75 -26
View File
@@ -1,8 +1,11 @@
import abc
import random
from time import sleep
import collections
from abc import ABCMeta, abstractmethod, abstractproperty
from datetime import timedelta
import numpy as np
import pandas as pd
from catalyst.assets._assets import Asset
from logbook import Logger
@@ -13,6 +16,7 @@ from catalyst.errors import (
)
from catalyst.finance.order import ORDER_STATUS
from catalyst.finance.transaction import Transaction
from catalyst.exchange.exchange_utils import get_exchange_symbols
log = Logger('Exchange')
@@ -25,6 +29,8 @@ class Exchange:
self.trading_pairs = None
self.assets = {}
self._portfolio = None
self.minute_writer = None
self.minute_reader = None
@abstractmethod
def subscribe_to_market_data(self, symbol):
@@ -100,20 +106,7 @@ class Exchange:
return asset
@staticmethod
def asset_parser(asset):
"""
Helper method to de-serialize Asset objects correctly.
:param asset:
:return:
"""
for key in asset:
if key == 'start_date':
asset[key] = pd.to_datetime(asset[key], utc=True)
return asset
def load_assets(self, symbol_map):
def load_assets(self):
"""
Populate the 'assets' attribute with a dictionary of Assets.
The key of the resulting dictionary is the exchange specific
@@ -121,23 +114,31 @@ class Exchange:
'symbol' attribute of each asset.
Note
----
Notes
-----
The sid of each asset is calculated based on a numeric hash of the
universal symbol. This simple approach avoids maintaining a mapping
of sids.
:param symbol_map:
:return:
This method can be overridden if an exchange offers equivalent data
via its api.
"""
symbol_map = get_exchange_symbols(self.name)
for exchange_symbol in symbol_map:
asset = symbol_map[exchange_symbol]
symbol = asset['symbol']
asset_name = ' / '.join(symbol.split('_')).upper()
asset_obj = Asset(
sid=abs(hash(symbol_map[exchange_symbol]['symbol']))
% (10 ** 4),
symbol=symbol,
asset_name=asset_name,
sid=abs(hash(symbol)) % (10 ** 4),
exchange=self.name,
start_date=pd.to_datetime(asset['start_date'], utc=True),
end_date=pd.Timestamp.utcnow() + timedelta(minutes=300000),
**symbol_map[exchange_symbol]
)
self.assets[exchange_symbol] = asset_obj
def check_open_orders(self):
@@ -235,6 +236,13 @@ class Exchange:
"""
Similar to 'get_spot_value' but for a single asset
Note
----
We're writing each minute bar to disk using zipline's machinery.
This is especially useful when running multiple algorithms
concurrently. By using local data when possible, we try to reaching
request limits on exchanges.
:param asset:
:param field:
:param data_frequency:
@@ -247,12 +255,53 @@ class Exchange:
)
)
ohlc = self.get_candles(data_frequency, asset)
if field not in ohlc:
raise KeyError('Invalid column: %s' % field)
if field == 'price':
field = 'close'
value = ohlc[field]
log.debug('got spot value: {}'.format(value))
# Don't use a timezone here
dt = pd.Timestamp.utcnow().floor('1 min')
value = None
if self.minute_reader is not None:
try:
# Slight delay to minimize the chances that multiple algos
# might try to hit the cache at the exact same time.
sleep_time = random.uniform(0.5, 0.8)
sleep(sleep_time)
# TODO: This does not always! Why is that? Open an issue with zipline.
value = self.minute_reader.get_value(
sid=asset.sid,
dt=dt,
field=field
)
except Exception as e:
log.warn('minute data not found: {}'.format(e))
if value is None or np.isnan(value):
ohlc = self.get_candles(data_frequency, asset)
if field not in ohlc:
raise KeyError('Invalid column: %s' % field)
df = pd.DataFrame(
[ohlc],
index=pd.DatetimeIndex([dt]),
columns=['open', 'high', 'low', 'close', 'volume']
)
if self.minute_writer is not None:
try:
self.minute_writer.write_sid(
sid=asset.sid,
df=df
)
log.debug('wrote minute data: {}'.format(dt))
except Exception as e:
log.warn(
'unable to write minute data: {} {}'.format(dt, e))
value = ohlc[field]
log.debug('got spot value: {}'.format(value))
else:
log.debug('got spot value from cache: {}'.format(value))
return value
+5 -8
View File
@@ -58,16 +58,13 @@ class ExchangeClock(object):
while True:
current_time = pd.Timestamp.utcnow()
server_time = current_time.floor('1 min')
current_minute = current_time.floor('1 min')
if self._last_emit is None or server_time > self._last_emit:
log.debug('emitting minutely bar: {}'.format(server_time))
if self._last_emit is None or current_minute > self._last_emit:
log.debug('emitting minutely bar: {}'.format(current_minute))
self._last_emit = server_time
yield server_time, BAR
if self.minute_emission:
yield server_time, MINUTE_END
self._last_emit = current_minute
yield current_minute, BAR
else:
sleep(1)
+9
View File
@@ -95,3 +95,12 @@ def save_algo_object(algo_name, key, obj, environ=None):
with open(filename, 'wb') as handle:
pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL)
def get_exchange_minute_writer_root(exchange_name, environ=None):
exchange_folder = get_exchange_folder(exchange_name, environ)
minute_data_folder = os.path.join(exchange_folder, 'minute_data')
ensure_directory(minute_data_folder)
return minute_data_folder
+1 -1
View File
@@ -4,7 +4,7 @@
"start_date": "2010-01-01"
},
"ltcusd": {
"symbol": "ltc-usd",
"symbol": "ltc_usd",
"start_date": "2010-01-01"
},
"ltcbtc": {