diff --git a/catalyst/examples/buy_the_dip_live.py b/catalyst/examples/buy_the_dip_live.py index 73d56385..e240c17a 100644 --- a/catalyst/examples/buy_the_dip_live.py +++ b/catalyst/examples/buy_the_dip_live.py @@ -18,10 +18,10 @@ log = Logger(algo_namespace) def initialize(context): log.info('initializing algo') - context.ASSET_NAME = 'IOT_USD' + context.ASSET_NAME = 'XRP_USD' context.asset = symbol(context.ASSET_NAME) - context.TARGET_POSITIONS = 200 + context.TARGET_POSITIONS = 7 context.PROFIT_TARGET = 0.1 context.SLIPPAGE_ALLOWED = 0.02 @@ -44,12 +44,12 @@ def _handle_data(context, data): log.info('got rsi: {}'.format(rsi)) # Buying more when RSI is low, this should lower our cost basis - if rsi <= 40: - buy_increment = 2 - elif rsi <= 30: - buy_increment = 5 - else: + if rsi <= 30: buy_increment = 1 + elif rsi <= 40: + buy_increment = 0.5 + else: + buy_increment = 0.1 cash = context.portfolio.cash log.info('base currency available: {cash}'.format(cash=cash)) @@ -76,10 +76,6 @@ def _handle_data(context, data): if context.asset in context.portfolio.positions: position = context.portfolio.positions[context.asset] - if position.amount >= context.TARGET_POSITIONS: - log.info('reached positions target: {}'.format(position.amount)) - return - cost_basis = position.cost_basis log.info( 'found {amount} positions with cost basis {cost_basis}'.format( @@ -87,6 +83,20 @@ def _handle_data(context, data): cost_basis=cost_basis ) ) + + # if position.amount > 0: + # order_target_percent( + # asset=context.asset, + # target=0, + # limit_price=price * (1 - context.SLIPPAGE_ALLOWED), + # ) + # log.debug('liquidated the position') + # return + + if position.amount >= context.TARGET_POSITIONS: + log.info('reached positions target: {}'.format(position.amount)) + return + if price < cost_basis: is_buy = True elif price > cost_basis * (1 + context.PROFIT_TARGET) or rsi > 70: @@ -120,7 +130,7 @@ def handle_data(context, data): log.info('handling bar {}'.format(data.current_dt)) try: _handle_data(context, data) - except ZiplineError as e: + except Exception as e: log.warn('aborting the bar on error {}'.format(e)) context.errors.append(e) diff --git a/catalyst/exchange/algorithm_exchange.py b/catalyst/exchange/algorithm_exchange.py index 8de05186..72c46cf5 100644 --- a/catalyst/exchange/algorithm_exchange.py +++ b/catalyst/exchange/algorithm_exchange.py @@ -10,6 +10,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os import signal import sys from datetime import timedelta @@ -20,6 +21,8 @@ import pandas as pd import catalyst.protocol as zp from catalyst.algorithm import TradingAlgorithm +from catalyst.data.minute_bars import BcolzMinuteBarWriter, \ + BcolzMinuteBarReader from catalyst.errors import OrderInBeforeTradingStart from catalyst.exchange.exchange_clock import ExchangeClock from catalyst.exchange.exchange_errors import ( @@ -27,6 +30,7 @@ from catalyst.exchange.exchange_errors import ( ExchangePortfolioDataError, ExchangeTransactionError ) +from catalyst.exchange.exchange_utils import get_exchange_minute_writer_root from catalyst.exchange.exchange_utils import save_algo_object, get_algo_object from catalyst.finance.performance.period import calc_period_stats from catalyst.gens.tradesimulation import AlgorithmSimulator @@ -58,9 +62,32 @@ class ExchangeTradingAlgorithm(TradingAlgorithm): self.retry_delay = 5 super(self.__class__, self).__init__(*args, **kwargs) + self._create_minute_writer() + + signal.signal(signal.SIGINT, self.signal_handler) log.info('exchange trading algorithm successfully initialized') + def _create_minute_writer(self): + root = get_exchange_minute_writer_root(self.exchange.name) + filename = os.path.join(root, 'metadata.json') + + if os.path.isfile(filename): + writer = BcolzMinuteBarWriter.open( + root, self.sim_params.end_session) + else: + writer = BcolzMinuteBarWriter( + rootdir=root, + calendar=self.trading_calendar, + minutes_per_day=1440, + start_session=self.sim_params.start_session, + end_session=self.sim_params.end_session, + write_metadata=True + ) + + self.exchange.minute_writer = writer + self.exchange.minute_reader = BcolzMinuteBarReader(root) + def signal_handler(self, signal, frame): self.is_running = False @@ -103,7 +130,6 @@ class ExchangeTradingAlgorithm(TradingAlgorithm): execution_closes = \ self.trading_calendar.execution_time_from_close(market_closes) - signal.signal(signal.SIGINT, self.signal_handler) return ExchangeClock( self.sim_params.sessions, @@ -343,6 +369,15 @@ class ExchangeTradingAlgorithm(TradingAlgorithm): self.perf_tracker.process_order(order) return order + def round_order(self, amount): + """ + We need fractions with cryptocurrencies + + :param amount: + :return: + """ + return amount + @api_method def batch_market_order(self, share_counts): raise NotImplementedError() diff --git a/catalyst/exchange/bitfinex.py b/catalyst/exchange/bitfinex.py index 5e53733a..03184753 100644 --- a/catalyst/exchange/bitfinex.py +++ b/catalyst/exchange/bitfinex.py @@ -1,4 +1,5 @@ import base64 +import numpy as np import hashlib import hmac import json @@ -18,7 +19,6 @@ from catalyst.exchange.exchange_errors import ( ExchangeRequestError, InvalidHistoryFrequencyError ) -from catalyst.exchange.exchange_utils import get_exchange_symbols from catalyst.finance.execution import (MarketOrder, LimitOrder, StopOrder, @@ -44,9 +44,11 @@ class Bitfinex(Exchange): self.id = 'b' self.name = 'bitfinex' self.assets = {} - self.load_assets(get_exchange_symbols(self.name)) + self.load_assets() self.base_currency = base_currency self._portfolio = portfolio + self.minute_writer = None + self.minute_reader = None def _request(self, operation, data, version='v1'): payload_object = { @@ -288,6 +290,8 @@ class Bitfinex(Exchange): '1m', '5m', '15m', '30m', '1h', '3h', '6h', '12h', '1D', '7D', '14D', '1M' """ + + # TODO: use BcolzMinuteBarReader to read from cache freq_match = re.match(r'([0-9].*)(m|h|d)', data_frequency, re.M | re.I) if freq_match: number = int(freq_match.group(1)) @@ -347,16 +351,18 @@ class Bitfinex(Exchange): candles = response.json() def ohlc_from_candle(candle): - return dict( - open=candle[1], - high=candle[3], - low=candle[4], - close=candle[2], - volume=candle[5], - price=candle[2], + ohlc = dict( + open=np.float64(candle[1]), + high=np.float64(candle[3]), + low=np.float64(candle[4]), + close=np.float64(candle[2]), + volume=np.float64(candle[5]), + price=np.float64(candle[2]), last_traded=pd.Timestamp.utcfromtimestamp( candle[0] / 1000.0), + minute_dt=pd.Timestamp.utcnow().floor('1 min') ) + return ohlc if is_list: ohlc_bars = [] diff --git a/catalyst/exchange/exchange.py b/catalyst/exchange/exchange.py index e62a6496..4f8c3640 100644 --- a/catalyst/exchange/exchange.py +++ b/catalyst/exchange/exchange.py @@ -1,8 +1,11 @@ import abc +import random +from time import sleep import collections from abc import ABCMeta, abstractmethod, abstractproperty from datetime import timedelta +import numpy as np import pandas as pd from catalyst.assets._assets import Asset from logbook import Logger @@ -13,6 +16,7 @@ from catalyst.errors import ( ) from catalyst.finance.order import ORDER_STATUS from catalyst.finance.transaction import Transaction +from catalyst.exchange.exchange_utils import get_exchange_symbols log = Logger('Exchange') @@ -25,6 +29,8 @@ class Exchange: self.trading_pairs = None self.assets = {} self._portfolio = None + self.minute_writer = None + self.minute_reader = None @abstractmethod def subscribe_to_market_data(self, symbol): @@ -100,20 +106,7 @@ class Exchange: return asset - @staticmethod - def asset_parser(asset): - """ - Helper method to de-serialize Asset objects correctly. - - :param asset: - :return: - """ - for key in asset: - if key == 'start_date': - asset[key] = pd.to_datetime(asset[key], utc=True) - return asset - - def load_assets(self, symbol_map): + def load_assets(self): """ Populate the 'assets' attribute with a dictionary of Assets. The key of the resulting dictionary is the exchange specific @@ -121,23 +114,31 @@ class Exchange: 'symbol' attribute of each asset. - Note - ---- + Notes + ----- The sid of each asset is calculated based on a numeric hash of the universal symbol. This simple approach avoids maintaining a mapping of sids. - :param symbol_map: - :return: + This method can be overridden if an exchange offers equivalent data + via its api. """ + + symbol_map = get_exchange_symbols(self.name) for exchange_symbol in symbol_map: + asset = symbol_map[exchange_symbol] + symbol = asset['symbol'] + asset_name = ' / '.join(symbol.split('_')).upper() + asset_obj = Asset( - sid=abs(hash(symbol_map[exchange_symbol]['symbol'])) - % (10 ** 4), + symbol=symbol, + asset_name=asset_name, + sid=abs(hash(symbol)) % (10 ** 4), exchange=self.name, + start_date=pd.to_datetime(asset['start_date'], utc=True), end_date=pd.Timestamp.utcnow() + timedelta(minutes=300000), - **symbol_map[exchange_symbol] ) + self.assets[exchange_symbol] = asset_obj def check_open_orders(self): @@ -235,6 +236,13 @@ class Exchange: """ Similar to 'get_spot_value' but for a single asset + Note + ---- + We're writing each minute bar to disk using zipline's machinery. + This is especially useful when running multiple algorithms + concurrently. By using local data when possible, we try to reaching + request limits on exchanges. + :param asset: :param field: :param data_frequency: @@ -247,12 +255,53 @@ class Exchange: ) ) - ohlc = self.get_candles(data_frequency, asset) - if field not in ohlc: - raise KeyError('Invalid column: %s' % field) + if field == 'price': + field = 'close' - value = ohlc[field] - log.debug('got spot value: {}'.format(value)) + # Don't use a timezone here + dt = pd.Timestamp.utcnow().floor('1 min') + value = None + if self.minute_reader is not None: + try: + # Slight delay to minimize the chances that multiple algos + # might try to hit the cache at the exact same time. + sleep_time = random.uniform(0.5, 0.8) + sleep(sleep_time) + # TODO: This does not always! Why is that? Open an issue with zipline. + value = self.minute_reader.get_value( + sid=asset.sid, + dt=dt, + field=field + ) + except Exception as e: + log.warn('minute data not found: {}'.format(e)) + + if value is None or np.isnan(value): + ohlc = self.get_candles(data_frequency, asset) + if field not in ohlc: + raise KeyError('Invalid column: %s' % field) + + df = pd.DataFrame( + [ohlc], + index=pd.DatetimeIndex([dt]), + columns=['open', 'high', 'low', 'close', 'volume'] + ) + + if self.minute_writer is not None: + try: + self.minute_writer.write_sid( + sid=asset.sid, + df=df + ) + log.debug('wrote minute data: {}'.format(dt)) + except Exception as e: + log.warn( + 'unable to write minute data: {} {}'.format(dt, e)) + + value = ohlc[field] + log.debug('got spot value: {}'.format(value)) + else: + log.debug('got spot value from cache: {}'.format(value)) return value diff --git a/catalyst/exchange/exchange_clock.py b/catalyst/exchange/exchange_clock.py index 0c528bc0..70353884 100644 --- a/catalyst/exchange/exchange_clock.py +++ b/catalyst/exchange/exchange_clock.py @@ -58,16 +58,13 @@ class ExchangeClock(object): while True: current_time = pd.Timestamp.utcnow() - server_time = current_time.floor('1 min') + current_minute = current_time.floor('1 min') - if self._last_emit is None or server_time > self._last_emit: - log.debug('emitting minutely bar: {}'.format(server_time)) + if self._last_emit is None or current_minute > self._last_emit: + log.debug('emitting minutely bar: {}'.format(current_minute)) - self._last_emit = server_time - yield server_time, BAR - - if self.minute_emission: - yield server_time, MINUTE_END + self._last_emit = current_minute + yield current_minute, BAR else: sleep(1) diff --git a/catalyst/exchange/exchange_utils.py b/catalyst/exchange/exchange_utils.py index d1aeacd5..bfd3f3c4 100644 --- a/catalyst/exchange/exchange_utils.py +++ b/catalyst/exchange/exchange_utils.py @@ -95,3 +95,12 @@ def save_algo_object(algo_name, key, obj, environ=None): with open(filename, 'wb') as handle: pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL) + + +def get_exchange_minute_writer_root(exchange_name, environ=None): + exchange_folder = get_exchange_folder(exchange_name, environ) + + minute_data_folder = os.path.join(exchange_folder, 'minute_data') + ensure_directory(minute_data_folder) + + return minute_data_folder diff --git a/catalyst/exchange/symbols/bitfinex.json b/catalyst/exchange/symbols/bitfinex.json index 41bc17c9..6543b2d6 100644 --- a/catalyst/exchange/symbols/bitfinex.json +++ b/catalyst/exchange/symbols/bitfinex.json @@ -4,7 +4,7 @@ "start_date": "2010-01-01" }, "ltcusd": { - "symbol": "ltc-usd", + "symbol": "ltc_usd", "start_date": "2010-01-01" }, "ltcbtc": {