From b8d442cf892fdad1300217b73b479a54c1b78951 Mon Sep 17 00:00:00 2001 From: fredfortier Date: Sun, 27 Aug 2017 15:19:13 -0400 Subject: [PATCH] Creating a clean branch for live trading --- .gitignore | 4 + catalyst/examples/buy_the_dip_live.py | 155 +++++ catalyst/exchange/__init__.py | 0 catalyst/exchange/algorithm_exchange.py | 437 ++++++++++++++ catalyst/exchange/asset_finder_exchange.py | 91 +++ catalyst/exchange/bitfinex/__init__.py | 0 catalyst/exchange/bitfinex/bitfinex.py | 647 +++++++++++++++++++++ catalyst/exchange/bitfinex/symbols.json | 110 ++++ catalyst/exchange/bittrex/__init__.py | 0 catalyst/exchange/data_portal_exchange.py | 121 ++++ catalyst/exchange/exchange.py | 489 ++++++++++++++++ catalyst/exchange/exchange_clock.py | 60 ++ catalyst/exchange/exchange_errors.py | 60 ++ catalyst/exchange/exchange_portfolio.py | 87 +++ catalyst/exchange/exchange_utils.py | 133 +++++ 15 files changed, 2394 insertions(+) create mode 100644 catalyst/examples/buy_the_dip_live.py create mode 100644 catalyst/exchange/__init__.py create mode 100644 catalyst/exchange/algorithm_exchange.py create mode 100644 catalyst/exchange/asset_finder_exchange.py create mode 100644 catalyst/exchange/bitfinex/__init__.py create mode 100644 catalyst/exchange/bitfinex/bitfinex.py create mode 100644 catalyst/exchange/bitfinex/symbols.json create mode 100644 catalyst/exchange/bittrex/__init__.py create mode 100644 catalyst/exchange/data_portal_exchange.py create mode 100644 catalyst/exchange/exchange.py create mode 100644 catalyst/exchange/exchange_clock.py create mode 100644 catalyst/exchange/exchange_errors.py create mode 100644 catalyst/exchange/exchange_portfolio.py create mode 100644 catalyst/exchange/exchange_utils.py diff --git a/.gitignore b/.gitignore index 7e2c83e8..ff13509b 100644 --- a/.gitignore +++ b/.gitignore @@ -78,3 +78,7 @@ zipline.iml ./data TAGS + +python2 +python3 +scratch diff --git a/catalyst/examples/buy_the_dip_live.py b/catalyst/examples/buy_the_dip_live.py new file mode 100644 index 00000000..ff09a824 --- /dev/null +++ b/catalyst/examples/buy_the_dip_live.py @@ -0,0 +1,155 @@ +import talib +from logbook import Logger + +from catalyst.api import ( + order, + order_target_percent, + symbol, + record, + get_open_orders, +) +from catalyst.utils.run_algo import run_algorithm + +algo_namespace = 'buy_the_dip_live' +log = Logger(algo_namespace) + + +def initialize(context): + log.info('initializing algo') + context.ASSET_NAME = 'XRP_USD' + context.asset = symbol(context.ASSET_NAME) + + context.TARGET_POSITIONS = 5000 + context.PROFIT_TARGET = 0.1 + context.SLIPPAGE_ALLOWED = 0.02 + + context.retry_check_open_orders = 10 + context.retry_update_portfolio = 10 + context.retry_order = 5 + + context.errors = [] + pass + + +def _handle_data(context, data): + prices = data.history( + context.asset, + fields='price', + bar_count=20, + frequency='15m' + ) + rsi = talib.RSI(prices.values, timeperiod=14)[-1] + log.info('got rsi: {}'.format(rsi)) + + # Buying more when RSI is low, this should lower our cost basis + if rsi <= 30: + buy_increment = 50 + elif rsi <= 40: + buy_increment = 20 + elif rsi <= 70: + buy_increment = 5 + else: + buy_increment = None + + cash = context.portfolio.cash + log.info('base currency available: {cash}'.format(cash=cash)) + + price = data.current(context.asset, 'price') + log.info('got price {price}'.format(price=price)) + + record( + price=price, + rsi=rsi, + ) + + orders = get_open_orders(context.asset) + if orders: + log.info('skipping bar until all open orders execute') + return + + is_buy = False + cost_basis = None + if context.asset in context.portfolio.positions: + position = context.portfolio.positions[context.asset] + + cost_basis = position.cost_basis + log.info( + 'found {amount} positions with cost basis {cost_basis}'.format( + amount=position.amount, + cost_basis=cost_basis + ) + ) + + if position.amount >= context.TARGET_POSITIONS: + log.info('reached positions target: {}'.format(position.amount)) + return + + if price < cost_basis: + is_buy = True + elif position.amount > 0 and \ + price > cost_basis * (1 + context.PROFIT_TARGET): + profit = (price * position.amount) - (cost_basis * position.amount) + log.info('closing position, taking profit: {}'.format(profit)) + order_target_percent( + asset=context.asset, + target=0, + limit_price=price * (1 - context.SLIPPAGE_ALLOWED), + ) + else: + log.info('no buy or sell opportunity found') + else: + is_buy = True + + if is_buy: + if buy_increment is None: + log.info('the rsi is too high to consider buying {}'.format(rsi)) + return + + if price * buy_increment > cash: + log.info('not enough base currency to consider buying') + return + + log.info( + 'buying position cheaper than cost basis {} < {}'.format( + price, + cost_basis + ) + ) + order( + asset=context.asset, + amount=buy_increment, + limit_price=price * (1 + context.SLIPPAGE_ALLOWED) + ) + + +def handle_data(context, data): + log.info('handling bar {}'.format(data.current_dt)) + # try: + _handle_data(context, data) + # except Exception as e: + # log.warn('aborting the bar on error {}'.format(e)) + # context.errors.append(e) + + log.info('completed bar {}, total execution errors {}'.format( + data.current_dt, + len(context.errors) + )) + + if len(context.errors) > 0: + log.info('the errors:\n{}'.format(context.errors)) + + +def analyze(context, stats): + log.info('the full stats:\n{}'.format(stats.head())) + pass + + +run_algorithm( + initialize=initialize, + handle_data=handle_data, + analyze=analyze, + exchange_name='bitfinex', + live=True, + algo_namespace=algo_namespace, + base_currency='usd' +) diff --git a/catalyst/exchange/__init__.py b/catalyst/exchange/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/catalyst/exchange/algorithm_exchange.py b/catalyst/exchange/algorithm_exchange.py new file mode 100644 index 00000000..78f41645 --- /dev/null +++ b/catalyst/exchange/algorithm_exchange.py @@ -0,0 +1,437 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import signal +import sys +import pickle +from datetime import timedelta +from time import sleep +from os import listdir +from os.path import isfile, join + +import logbook +import pandas as pd + +import catalyst.protocol as zp +from catalyst.algorithm import TradingAlgorithm +from catalyst.data.minute_bars import BcolzMinuteBarWriter, \ + BcolzMinuteBarReader +from catalyst.errors import OrderInBeforeTradingStart +from catalyst.exchange.exchange_clock import ExchangeClock +from catalyst.exchange.exchange_errors import ( + ExchangeRequestError, + ExchangePortfolioDataError, + ExchangeTransactionError +) +from catalyst.exchange.exchange_utils import get_exchange_minute_writer_root, \ + save_algo_object, get_algo_object, get_algo_folder +from catalyst.finance.performance.period import calc_period_stats +from catalyst.gens.tradesimulation import AlgorithmSimulator +from catalyst.utils.api_support import ( + api_method, + disallowed_in_before_trading_start) +from catalyst.utils.input_validation import error_keywords + +log = logbook.Logger("ExchangeTradingAlgorithm") + + +class ExchangeAlgorithmExecutor(AlgorithmSimulator): + def __init__(self, *args, **kwargs): + super(self.__class__, self).__init__(*args, **kwargs) + + +class ExchangeTradingAlgorithm(TradingAlgorithm): + def __init__(self, *args, **kwargs): + self.exchange = kwargs.pop('exchange', None) + self.algo_namespace = kwargs.pop('algo_namespace', None) + self.orders = {} + self.is_running = True + + self.retry_check_open_orders = 5 + self.retry_update_portfolio = 5 + self.retry_get_open_orders = 5 + self.retry_order = 2 + self.retry_delay = 5 + + super(self.__class__, self).__init__(*args, **kwargs) + self._create_minute_writer() + + signal.signal(signal.SIGINT, self.signal_handler) + + log.info('exchange trading algorithm successfully initialized') + + def _create_minute_writer(self): + root = get_exchange_minute_writer_root(self.exchange.name) + filename = os.path.join(root, 'metadata.json') + + if os.path.isfile(filename): + writer = BcolzMinuteBarWriter.open( + root, self.sim_params.end_session) + else: + writer = BcolzMinuteBarWriter( + rootdir=root, + calendar=self.trading_calendar, + minutes_per_day=1440, + start_session=self.sim_params.start_session, + end_session=self.sim_params.end_session, + write_metadata=True + ) + + self.exchange.minute_writer = writer + self.exchange.minute_reader = BcolzMinuteBarReader(root) + + def signal_handler(self, signal, frame): + self.is_running = False + + log.info('You pressed Ctrl+C!') + + stats = None + try: + algo_folder = get_algo_folder(self.algo_namespace) + folder = join(algo_folder, 'daily_perf') + files = [f for f in listdir(folder) if isfile(join(folder, f))] + + daily_perf_list = [] + for item in files: + filename = join(folder, item) + with open(filename, 'rb') as handle: + daily_perf_list.append(pickle.load(handle)) + + stats = pd.DataFrame(daily_perf_list) + stats.set_index('period_close', drop=True, inplace=True) + + except Exception as e: + log.warn('Unable to compute daily stats: {}'.format(e)) + + self.analyze(stats) + sys.exit(0) + + def _create_clock(self): + + # The calendar's execution times are the minutes over which we actually + # want to run the clock. Typically the execution times simply adhere to + # the market open and close times. In the case of the futures calendar, + # for example, we only want to simulate over a subset of the full 24 + # hour calendar, so the execution times dictate a market open time of + # 6:31am US/Eastern and a close of 5:00pm US/Eastern. + + # In our case, we are trading around the clock, so the market close + # corresponds to the last minute of the day. + + # This method is taken from TradingAlgorithm. + # The clock has been replaced to use RealtimeClock + # TODO: should we apply a time skew? not sure to understand the utility. + return ExchangeClock( + self.sim_params.sessions, + time_skew=self.exchange.time_skew + ) + + def _create_generator(self, sim_params): + if self.perf_tracker is None: + self.perf_tracker = get_algo_object( + algo_name=self.algo_namespace, + key='perf_tracker' + ) + + # Call the simulation trading algorithm for side-effects: + # it creates the perf tracker + TradingAlgorithm._create_generator(self, sim_params) + self.trading_client = ExchangeAlgorithmExecutor( + self, + sim_params, + self.data_portal, + self._create_clock(), + self._create_benchmark_source(), + self.restrictions, + universe_func=self._calculate_universe + ) + + return self.trading_client.transform() + + def updated_portfolio(self): + """ + We skip the entire performance tracker business and update the + portfolio directly. + :return: + """ + return self.exchange.portfolio + + def updated_account(self): + return self.exchange.account + + def _update_portfolio(self, attempt_index=0): + try: + self.exchange.update_portfolio() + + # Applying the updated last_sales_price to the positions + # in the performance tracker. This seems a bit redundant + # but it will make sense when we have multiple exchange portfolios + # feeding into the same performance tracker. + tracker = self.perf_tracker.todays_performance.position_tracker + for asset in self.exchange.portfolio.positions: + position = self.exchange.portfolio.positions[asset] + tracker.update_position( + asset=asset, + last_sale_date=position.last_sale_date, + last_sale_price=position.last_sale_price + ) + except ExchangeRequestError as e: + log.warn( + 'update portfolio attempt {}: {}'.format(attempt_index, e) + ) + if attempt_index < self.retry_update_portfolio: + sleep(self.retry_delay) + self._update_portfolio(attempt_index + 1) + else: + raise ExchangePortfolioDataError( + data_type='update-portfolio', + attempts=attempt_index, + error=e + ) + + def _check_open_orders(self, attempt_index=0): + try: + return self.exchange.check_open_orders() + except ExchangeRequestError as e: + log.warn( + 'check open orders attempt {}: {}'.format(attempt_index, e) + ) + if attempt_index < self.retry_check_open_orders: + sleep(self.retry_delay) + return self._check_open_orders(attempt_index + 1) + else: + raise ExchangePortfolioDataError( + data_type='order-status', + attempts=attempt_index, + error=e + ) + + def prepare_period_stats(self, start_dt, end_dt): + """ + Creates a dictionary representing the state of the tracker. + + + I rewrote this in an attempt to better control the stats. + I don't want things to happen magically through complex logic + pertaining to backtesting. + + """ + tracker = self.perf_tracker + period = tracker.todays_performance + + pos_stats = period.position_tracker.stats() + period_stats = calc_period_stats(pos_stats, period.ending_cash) + + stats = dict( + period_start=tracker.period_start, + period_end=tracker.period_end, + capital_base=tracker.capital_base, + progress=tracker.progress, + ending_value=period.ending_value, + ending_exposure=period.ending_exposure, + capital_used=period.cash_flow, + starting_value=period.starting_value, + starting_exposure=period.starting_exposure, + starting_cash=period.starting_cash, + ending_cash=period.ending_cash, + portfolio_value=period.ending_cash + period.ending_value, + pnl=period.pnl, + returns=period.returns, + period_open=period.period_open, + period_close=period.period_close, + gross_leverage=period_stats.gross_leverage, + net_leverage=period_stats.net_leverage, + short_exposure=pos_stats.short_exposure, + long_exposure=pos_stats.long_exposure, + short_value=pos_stats.short_value, + long_value=pos_stats.long_value, + longs_count=pos_stats.longs_count, + shorts_count=pos_stats.shorts_count, + ) + + # Merging cumulative risk + stats.update(tracker.cumulative_risk_metrics.to_dict()) + + # Merging latest recorded variables + stats.update(self.recorded_vars) + + stats['positions'] = period.position_tracker.get_positions_list() + + # we want the key to be absent, not just empty + # Only include transactions for given dt + stats['transactions'] = dict() + for date in period.processed_transactions: + if start_dt <= date < end_dt: + stats['transactions'][date] = \ + period.processed_transactions[date] + + stats['orders'] = dict() + for date in period.orders_by_modified: + if start_dt <= date < end_dt: + stats['orders'][date] = \ + period.orders_by_modified[date] + + return stats + + def handle_data(self, data): + if not self.is_running: + return + + self._update_portfolio() + + transactions = self._check_open_orders() + for transaction in transactions: + self.perf_tracker.process_transaction(transaction) + + if self._handle_data: + self._handle_data(self, data) + + # Unlike trading controls which remain constant unless placing an + # order, account controls can change each bar. Thus, must check + # every bar no matter if the algorithm places an order or not. + self.validate_account_controls() + + try: + # Since the clock runs 24/7, I trying to disable the daily + # Performance tracker and keep only minute and cumulative + self.perf_tracker.update_performance() + + # TODO: save for future use? + minute_stats = self.prepare_period_stats( + data.current_dt, data.current_dt + timedelta(minutes=1)) + log.debug('the minute performance:\n{}'.format(minute_stats)) + + today = pd.to_datetime('today', utc=True) + daily_stats = self.prepare_period_stats( + start_dt=today, + end_dt=pd.Timestamp.utcnow() + ) + save_algo_object( + algo_name=self.algo_namespace, + key=today.strftime('%Y-%m-%d'), + obj=daily_stats, + rel_path='daily_perf' + ) + + except Exception as e: + log.warn('unable to calculate performance: {}'.format(e)) + + try: + save_algo_object( + algo_name=self.algo_namespace, + key='perf_tracker', + obj=self.perf_tracker + ) + except Exception as e: + log.warn('unable to save minute perfs to disk: {}'.format(e)) + + try: + save_algo_object( + algo_name=self.algo_namespace, + key='portfolio_{}'.format(self.exchange.name), + obj=self.exchange.portfolio + ) + except Exception as e: + log.warn('unable to save portfolio to disk: {}'.format(e)) + + def _order(self, + asset, + amount, + limit_price=None, + stop_price=None, + style=None, + attempt_index=0): + try: + return self.exchange.order(asset, amount, limit_price, + stop_price, + style) + except ExchangeRequestError as e: + log.warn( + 'order attempt {}: {}'.format(attempt_index, e) + ) + if attempt_index < self.retry_order: + sleep(self.retry_delay) + return self._order( + asset, amount, limit_price, stop_price, style, + attempt_index + 1) + else: + raise ExchangeTransactionError( + transaction_type='order', + attempts=attempt_index, + error=e + ) + + @api_method + @disallowed_in_before_trading_start(OrderInBeforeTradingStart()) + def order(self, + asset, + amount, + limit_price=None, + stop_price=None, + style=None): + amount, style = self._calculate_order(asset, amount, + limit_price, stop_price, + style) + + order_id = self._order(asset, amount, limit_price, stop_price, style) + order = self.portfolio.open_orders[order_id] + + self.perf_tracker.process_order(order) + return order + + def round_order(self, amount): + """ + We need fractions with cryptocurrencies + + :param amount: + :return: + """ + return amount + + @api_method + def batch_market_order(self, share_counts): + raise NotImplementedError() + + def _get_open_orders(self, asset=None, attempt_index=0): + try: + return self.exchange.get_open_orders(asset) + except ExchangeRequestError as e: + log.warn( + 'open orders attempt {}: {}'.format(attempt_index, e) + ) + if attempt_index < self.retry_get_open_orders: + sleep(self.retry_delay) + return self._get_open_orders(asset, attempt_index + 1) + else: + raise ExchangePortfolioDataError( + data_type='open-orders', + attempts=attempt_index, + error=e + ) + + @error_keywords(sid='Keyword argument `sid` is no longer supported for ' + 'get_open_orders. Use `asset` instead.') + @api_method + def get_open_orders(self, asset=None): + return self._get_open_orders(asset) + + @api_method + def get_order(self, order_id): + return self.exchange.get_order(order_id) + + @api_method + def cancel_order(self, order_param): + order_id = order_param + if isinstance(order_param, zp.Order): + order_id = order_param.id + self.exchange.cancel_order(order_id) diff --git a/catalyst/exchange/asset_finder_exchange.py b/catalyst/exchange/asset_finder_exchange.py new file mode 100644 index 00000000..dc3d00b1 --- /dev/null +++ b/catalyst/exchange/asset_finder_exchange.py @@ -0,0 +1,91 @@ +from logbook import Logger + +log = Logger('AssetFinderExchange') + + +class AssetFinderExchange(object): + def __init__(self, exchange): + self.exchange = exchange + self._asset_cache = {} + + @property + def sids(self): + """ + This seems to be used to pre-fetch assets. + I don't think that we need this for live-trading. + Leaving the list empty. + """ + return list() + + def retrieve_all(self, sids, default_none=False): + """ + Retrieve all assets in `sids`. + + Parameters + ---------- + sids : iterable of int + Assets to retrieve. + default_none : bool + If True, return None for failed lookups. + If False, raise `SidsNotFound`. + + Returns + ------- + assets : list[Asset or None] + A list of the same length as `sids` containing Assets (or Nones) + corresponding to the requested sids. + + Raises + ------ + SidsNotFound + When a requested sid is not found and default_none=False. + """ + for sid in sids: + if sid in self._asset_cache: + log.info('got asset from cache: {}'.format(sid)) + else: + log.info('fetching asset: {}'.format(sid)) + return list() + + def lookup_symbol(self, symbol, as_of_date, fuzzy=False): + """Lookup an asset by symbol. + + Parameters + ---------- + symbol : str + The ticker symbol to resolve. + as_of_date : datetime or None + Look up the last owner of this symbol as of this datetime. + If ``as_of_date`` is None, then this can only resolve the equity + if exactly one equity has ever owned the ticker. + fuzzy : bool, optional + Should fuzzy symbol matching be used? Fuzzy symbol matching + attempts to resolve differences in representations for + shareclasses. For example, some people may represent the ``A`` + shareclass of ``BRK`` as ``BRK.A``, where others could write + ``BRK_A``. + + Returns + ------- + equity : Asset + The equity that held ``symbol`` on the given ``as_of_date``, or the + only equity to hold ``symbol`` if ``as_of_date`` is None. + + Raises + ------ + SymbolNotFound + Raised when no equity has ever held the given symbol. + MultipleSymbolsFound + Raised when no ``as_of_date`` is given and more than one equity + has held ``symbol``. This is also raised when ``fuzzy=True`` and + there are multiple candidates for the given ``symbol`` on the + ``as_of_date``. + """ + log.info('looking up symbol: {}'.format(symbol)) + + if symbol in self._asset_cache: + return self._asset_cache[symbol] + else: + asset = self.exchange.get_asset(symbol) + self._asset_cache[symbol] = asset + return asset diff --git a/catalyst/exchange/bitfinex/__init__.py b/catalyst/exchange/bitfinex/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/catalyst/exchange/bitfinex/bitfinex.py b/catalyst/exchange/bitfinex/bitfinex.py new file mode 100644 index 00000000..03184753 --- /dev/null +++ b/catalyst/exchange/bitfinex/bitfinex.py @@ -0,0 +1,647 @@ +import base64 +import numpy as np +import hashlib +import hmac +import json +import re +import time + +import pandas as pd +import pytz +import requests +import six +from catalyst.assets._assets import Asset +from logbook import Logger + +# from websocket import create_connection +from catalyst.exchange.exchange import Exchange +from catalyst.exchange.exchange_errors import ( + ExchangeRequestError, + InvalidHistoryFrequencyError +) +from catalyst.finance.execution import (MarketOrder, + LimitOrder, + StopOrder, + StopLimitOrder) +from catalyst.finance.order import Order, ORDER_STATUS +from catalyst.protocol import Account + +# Trying to account for REST api instability +# https://stackoverflow.com/questions/15431044/can-i-set-max-retries-for-requests-request +requests.adapters.DEFAULT_RETRIES = 20 + +BITFINEX_URL = 'https://api.bitfinex.com' + +log = Logger('Bitfinex') +warning_logger = Logger('AlgoWarning') + + +class Bitfinex(Exchange): + def __init__(self, key, secret, base_currency, portfolio=None): + self.url = BITFINEX_URL + self.key = key + self.secret = secret + self.id = 'b' + self.name = 'bitfinex' + self.assets = {} + self.load_assets() + self.base_currency = base_currency + self._portfolio = portfolio + self.minute_writer = None + self.minute_reader = None + + def _request(self, operation, data, version='v1'): + payload_object = { + 'request': '/{}/{}'.format(version, operation), + 'nonce': '{0:f}'.format(time.time() * 1000000), + # convert to string + 'options': {} + } + + if data is None: + payload_dict = payload_object + else: + payload_dict = payload_object.copy() + payload_dict.update(data) + + payload_json = json.dumps(payload_dict) + if six.PY3: + payload = base64.b64encode(bytes(payload_json, 'utf-8')) + else: + payload = base64.b64encode(payload_json) + + m = hmac.new(self.secret, payload, hashlib.sha384) + m = m.hexdigest() + + # headers + headers = { + 'X-BFX-APIKEY': self.key, + 'X-BFX-PAYLOAD': payload, + 'X-BFX-SIGNATURE': m + } + + if data is None: + request = requests.get( + '{url}/{version}/{operation}'.format( + url=self.url, + version=version, + operation=operation + ), data={}, + headers=headers) + else: + request = requests.post( + '{url}/{version}/{operation}'.format( + url=self.url, + version=version, + operation=operation + ), + headers=headers) + + return request + + def _get_v2_symbol(self, asset): + pair = asset.symbol.split('_') + symbol = 't' + pair[0].upper() + pair[1].upper() + return symbol + + def _get_v2_symbols(self, assets): + """ + Workaround to support Bitfinex v2 + TODO: Might require a separate asset dictionary + + :param assets: + :return: + """ + + v2_symbols = [] + for asset in assets: + v2_symbols.append(self._get_v2_symbol(asset)) + + return v2_symbols + + def _create_order(self, order_status): + """ + Create a Catalyst order object from a Bitfinex order dictionary + :param order_status: + :return: Order + """ + if order_status['is_cancelled']: + status = ORDER_STATUS.CANCELLED + elif not order_status['is_live']: + log.info('found executed order {}'.format(order_status)) + status = ORDER_STATUS.FILLED + else: + status = ORDER_STATUS.OPEN + + amount = float(order_status['original_amount']) + filled = float(order_status['executed_amount']) + is_buy = (amount > 0) + + price = float(order_status['price']) + order_type = order_status['type'] + + stop_price = None + limit_price = None + + # TODO: is this comprehensive enough? + if order_type.endswith('limit'): + limit_price = price + elif order_type.endswith('stop'): + stop_price = price + + executed_price = float(order_status['avg_execution_price']) + + # TODO: bitfinex does not specify comission. I could calculate it but not sure if it's worth it. + commission = None + + # TODO: zipline likes rounded dates to match statistics, is this ok? + date = pd.Timestamp.utcfromtimestamp(float(order_status['timestamp'])) + date = pytz.utc.localize(date) + order = Order( + dt=date, + asset=self.assets[order_status['symbol']], + amount=amount, + stop=stop_price, + limit=limit_price, + filled=filled, + id=order_status['id'], + commission=commission + ) + order.status = status + + return order, executed_price + + def update_portfolio(self): + """ + Update the portfolio cash and position balances based on the + latest ticker prices. + + :return: + """ + try: + response = self._request('balances', None) + balances = response.json() + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'message' in balances: + raise ExchangeRequestError( + error='unable to fetch balance {}'.format(balances['message']) + ) + + base_position = None + for position in balances: + if not base_position and position['type'] == 'exchange' \ + and position['currency'] == self.base_currency: + base_position = position + + if position is None: + raise ValueError( + error='Base currency %s not found in portfolio' % self.base_currency + ) + + portfolio = self._portfolio + portfolio.cash = float(base_position['available']) + if portfolio.starting_cash is None: + portfolio.starting_cash = portfolio.cash + + if portfolio.positions: + assets = portfolio.positions.keys() + tickers = self.tickers(assets) + portfolio.positions_value = 0.0 + for ticker in tickers: + # TODO: convert if the position is not in the base currency + position = portfolio.positions[ticker['asset']] + position.last_sale_price = ticker['last_price'] + position.last_sale_date = ticker['timestamp'] + + portfolio.positions_value += \ + position.amount * position.last_sale_price + portfolio.portfolio_value = \ + portfolio.positions_value + portfolio.cash + + @property + def portfolio(self): + """ + Return the Portfolio + + :return: + """ + # if self._portfolio is None: + # portfolio = ExchangePortfolio( + # start_date=pd.Timestamp.utcnow() + # ) + # self.store.portfolio = portfolio + # self.update_portfolio() + # + # portfolio.starting_cash = portfolio.cash + # else: + # portfolio = self.store.portfolio + + return self._portfolio + + @property + def account(self): + account = Account() + + account.settled_cash = None + account.accrued_interest = None + account.buying_power = None + account.equity_with_loan = None + account.total_positions_value = None + account.total_positions_exposure = None + account.regt_equity = None + account.regt_margin = None + account.initial_margin_requirement = None + account.maintenance_margin_requirement = None + account.available_funds = None + account.excess_liquidity = None + account.cushion = None + account.day_trades_remaining = None + account.leverage = None + account.net_leverage = None + account.net_liquidation = None + + return account + + @property + def positions(self): + return self.portfolio.positions + + @property + def time_skew(self): + # TODO: research the time skew conditions + return pd.Timedelta('0s') + + def subscribe_to_market_data(self, symbol): + pass + + def get_candles(self, data_frequency, assets, bar_count=None): + """ + Retrieve OHLVC candles from Bitfinex + + :param data_frequency: + :param assets: + :param bar_count: + :return: + + Available Frequencies + --------------------- + '1m', '5m', '15m', '30m', '1h', '3h', '6h', '12h', '1D', '7D', '14D', + '1M' + """ + + # TODO: use BcolzMinuteBarReader to read from cache + freq_match = re.match(r'([0-9].*)(m|h|d)', data_frequency, re.M | re.I) + if freq_match: + number = int(freq_match.group(1)) + unit = freq_match.group(2) + + if unit == 'd': + converted_unit = 'D' + else: + converted_unit = unit + + frequency = '{}{}'.format(number, converted_unit) + allowed_frequencies = ['1m', '5m', '15m', '30m', '1h', '3h', '6h', + '12h', '1D', '7D', '14D', '1M'] + + if frequency not in allowed_frequencies: + raise InvalidHistoryFrequencyError( + frequency=data_frequency + ) + elif data_frequency == 'minute': + frequency = '1m' + elif data_frequency == 'daily': + frequency = '1D' + else: + raise InvalidHistoryFrequencyError( + frequency=data_frequency + ) + + # Making sure that assets are iterable + asset_list = [assets] if isinstance(assets, Asset) else assets + ohlc_list = dict() + for asset in asset_list: + symbol = self._get_v2_symbol(asset) + url = '{url}/v2/candles/trade:{frequency}:{symbol}'.format( + url=self.url, + frequency=frequency, + symbol=symbol + ) + + if bar_count: + is_list = True + url += '/hist?limit={}'.format(int(bar_count)) + else: + is_list = False + url += '/last' + + try: + response = requests.get(url) + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'error' in response.content: + raise ExchangeRequestError( + error='Unable to retrieve candles: {}'.format( + response.content) + ) + + candles = response.json() + + def ohlc_from_candle(candle): + ohlc = dict( + open=np.float64(candle[1]), + high=np.float64(candle[3]), + low=np.float64(candle[4]), + close=np.float64(candle[2]), + volume=np.float64(candle[5]), + price=np.float64(candle[2]), + last_traded=pd.Timestamp.utcfromtimestamp( + candle[0] / 1000.0), + minute_dt=pd.Timestamp.utcnow().floor('1 min') + ) + return ohlc + + if is_list: + ohlc_bars = [] + # We can to list candles from old to new + for candle in reversed(candles): + ohlc = ohlc_from_candle(candle) + ohlc_bars.append(ohlc) + + ohlc_list[asset] = ohlc_bars + + else: + ohlc = ohlc_from_candle(candles) + ohlc_list[asset] = ohlc + + return ohlc_list[assets] \ + if isinstance(assets, Asset) else ohlc_list + + def order(self, asset, amount, limit_price, stop_price, style): + """Place an order. + + Parameters + ---------- + asset : Asset + The asset that this order is for. + amount : int + The amount of shares to order. If ``amount`` is positive, this is + the number of shares to buy or cover. If ``amount`` is negative, + this is the number of shares to sell or short. + limit_price : float, optional + The limit price for the order. + stop_price : float, optional + The stop price for the order. + style : ExecutionStyle, optional + The execution style for the order. + + Returns + ------- + order_id : str or None + The unique identifier for this order, or None if no order was + placed. + + Notes + ----- + The ``limit_price`` and ``stop_price`` arguments provide shorthands for + passing common execution styles. Passing ``limit_price=N`` is + equivalent to ``style=LimitOrder(N)``. Similarly, passing + ``stop_price=M`` is equivalent to ``style=StopOrder(M)``, and passing + ``limit_price=N`` and ``stop_price=M`` is equivalent to + ``style=StopLimitOrder(N, M)``. It is an error to pass both a ``style`` + and ``limit_price`` or ``stop_price``. + + Bitfinex Order Types + -------------------- + LIMIT, MARKET, STOP, TRAILING STOP, + EXCHANGE MARKET, EXCHANGE LIMIT, EXCHANGE STOP, + EXCHANGE TRAILING STOP, FOK, EXCHANGE FOK. + + See Also + -------- + :class:`catalyst.finance.execution.ExecutionStyle` + :func:`catalyst.api.order_value` + :func:`catalyst.api.order_percent` + """ + if amount == 0: + log.warn('skipping order amount of 0') + return None + + base_currency = asset.symbol.split('_')[1] + if base_currency.lower() != self.base_currency.lower(): + raise NotImplementedError( + 'Currency pairs must share their base with the exchange.' + ) + + is_buy = (amount > 0) + + if isinstance(style, MarketOrder): + order_type = 'market' + elif isinstance(style, LimitOrder): + order_type = 'limit' + price = limit_price + elif isinstance(style, StopOrder): + order_type = 'stop' + price = stop_price + elif isinstance(style, StopLimitOrder): + log.warn('using limit order instead of stop/limit') + # TODO: Not sure how to do this with the api. Investigate. + order_type = 'limit' + price = limit_price + else: + raise NotImplementedError('%s orders not available' % style) + + log.debug( + 'ordering {amount} {symbol} for {price}'.format( + amount=amount, + symbol=asset.symbol, + price=price + ) + ) + + exchange_symbol = self.get_symbol(asset) + req = dict( + symbol=exchange_symbol, + amount=str(float(abs(amount))), + price=str(float(price)), + side='buy' if is_buy else 'sell', + type='exchange ' + order_type, # TODO: support margin trades + exchange=self.name, + is_hidden=False, + is_postonly=False, + use_all_available=0, + ocoorder=False, + buy_price_oco=0, + sell_price_oco=0 + ) + + date = pd.Timestamp.utcnow() + try: + response = self._request('order/new', req) + exchange_order = response.json() + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'message' in exchange_order: + raise ExchangeRequestError( + error='unable to create Bitfinex order {}'.format( + exchange_order['message']) + ) + + order_id = exchange_order['id'] + order = Order( + dt=date, + asset=asset, + amount=amount, + stop=style.get_stop_price(is_buy), + limit=style.get_limit_price(is_buy), + id=order_id + ) + # TODO: is this required? + order.broker_order_id = order_id + + self.portfolio.create_order(order) + + return order_id + + def get_open_orders(self, asset=None): + """Retrieve all of the current open orders. + + Parameters + ---------- + asset : Asset + If passed and not None, return only the open orders for the given + asset instead of all open orders. + + Returns + ------- + open_orders : dict[list[Order]] or list[Order] + If no asset is passed this will return a dict mapping Assets + to a list containing all the open orders for the asset. + If an asset is passed then this will return a list of the open + orders for this asset. + """ + try: + response = self._request('orders', None) + order_statuses = response.json() + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'message' in order_statuses: + raise ExchangeRequestError( + error='Unable to retrieve open orders: {}'.format( + order_statuses['message']) + ) + + orders = list() + for order_status in order_statuses: + order, = self._create_order(order_status) + if asset is None or asset == order.sid: + orders.append(order) + + return orders + + def get_order(self, order_id): + """Lookup an order based on the order id returned from one of the + order functions. + + Parameters + ---------- + order_id : str + The unique identifier for the order. + + Returns + ------- + order : Order + The order object. + """ + try: + response = self._request( + 'order/status', {'order_id': int(order_id)}) + order_status = response.json() + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'message' in order_status: + raise ExchangeRequestError( + error='Unable to retrieve order status: {}'.format( + order_status['message']) + ) + return self._create_order(order_status) + + def cancel_order(self, order_param): + """Cancel an open order. + + Parameters + ---------- + order_param : str or Order + The order_id or order object to cancel. + """ + order_id = order_param.id \ + if isinstance(order_param, Order) else order_param + + try: + response = self._request('order/cancel', {'order_id': order_id}) + status = response.json() + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'message' in status: + raise ExchangeRequestError( + error='Unable to cancel order: {} {}'.format( + order_id, status['message']) + ) + + def tickers(self, assets): + """ + Fetch ticket data for assets + https://docs.bitfinex.com/v2/reference#rest-public-tickers + + :param assets: + :return: + """ + symbols = self._get_v2_symbols(assets) + log.debug('fetching tickers {}'.format(symbols)) + + try: + response = requests.get( + '{url}/v2/tickers?symbols={symbols}'.format( + url=self.url, + symbols=','.join(symbols), + ) + ) + except Exception as e: + raise ExchangeRequestError(error=e) + + if 'error' in response.content: + raise ExchangeRequestError( + error='Unable to retrieve tickers: {}'.format( + response.content) + ) + + tickers = response.json() + + formatted_tickers = [] + for index, ticker in enumerate(tickers): + if not len(ticker) == 11: + raise ExchangeRequestError( + error='Invalid ticker in response: {}'.format(ticker) + ) + + tick = dict( + asset=assets[index], + timestamp=pd.Timestamp.utcnow(), + bid=ticker[1], + ask=ticker[3], + last_price=ticker[7], + low=ticker[10], + high=ticker[9], + volume=ticker[8], + ) + formatted_tickers.append(tick) + + log.debug('got tickers {}'.format(formatted_tickers)) + return formatted_tickers diff --git a/catalyst/exchange/bitfinex/symbols.json b/catalyst/exchange/bitfinex/symbols.json new file mode 100644 index 00000000..6543b2d6 --- /dev/null +++ b/catalyst/exchange/bitfinex/symbols.json @@ -0,0 +1,110 @@ +{ + "btcusd": { + "symbol": "btc_usd", + "start_date": "2010-01-01" + }, + "ltcusd": { + "symbol": "ltc_usd", + "start_date": "2010-01-01" + }, + "ltcbtc": { + "symbol": "ltc_btc", + "start_date": "2010-01-01" + }, + "ethusd": { + "symbol": "eth_usd", + "start_date": "2010-01-01" + }, + "ethbtc": { + "symbol": "eth_btc", + "start_date": "2010-01-01" + }, + "etcbtc": { + "symbol": "etc_btc", + "start_date": "2010-01-01" + }, + "etcusd": { + "symbol": "etc_usd", + "start_date": "2010-01-01" + }, + "rrtusd": { + "symbol": "rrt_usd", + "start_date": "2010-01-01" + }, + "rrtbtc": { + "symbol": "rrt_btc", + "start_date": "2010-01-01" + }, + "zecusd": { + "symbol": "zec_usd", + "start_date": "2010-01-01" + }, + "zecbtc": { + "symbol": "zec_btc", + "start_date": "2010-01-01" + }, + "xmrusd": { + "symbol": "xmr_usd", + "start_date": "2010-01-01" + }, + "xmrbtc": { + "symbol": "xmr_btc", + "start_date": "2010-01-01" + }, + "dshusd": { + "symbol": "dsh_usd", + "start_date": "2010-01-01" + }, + "dshbtc": { + "symbol": "dsh_btc", + "start_date": "2010-01-01" + }, + "bccbtc": { + "symbol": "bcc_btc", + "start_date": "2010-01-01" + }, + "bcubtc": { + "symbol": "bcu_btc", + "start_date": "2010-01-01" + }, + "bccusd": { + "symbol": "bcc_usd", + "start_date": "2010-01-01" + }, + "bcuusd": { + "symbol": "bcu_usd", + "start_date": "2010-01-01" + }, + "xrpusd": { + "symbol": "xrp_usd", + "start_date": "2010-01-01" + }, + "xrpbtc": { + "symbol": "xrp_btc", + "start_date": "2010-01-01" + }, + "iotusd": { + "symbol": "iot_usd", + "start_date": "2010-01-01" + }, + "iotbtc": { + "symbol": "iot_btc", + "start_date": "2010-01-01" + }, + "ioteth": { + "symbol": "iot_eth", + "start_date": "2010-01-01" + }, + "eosusd": { + "symbol": "eos_usd", + "start_date": "2010-01-01" + }, + "eosbtc": { + "symbol": "eos_btc", + "start_date": "2010-01-01" + }, + "eoseth": { + "symbol": "eos_eth", + "start_date": "2010-01-01" + } +} \ No newline at end of file diff --git a/catalyst/exchange/bittrex/__init__.py b/catalyst/exchange/bittrex/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/catalyst/exchange/data_portal_exchange.py b/catalyst/exchange/data_portal_exchange.py new file mode 100644 index 00000000..77a7cb76 --- /dev/null +++ b/catalyst/exchange/data_portal_exchange.py @@ -0,0 +1,121 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from time import sleep + +from logbook import Logger + +from catalyst.data.data_portal import DataPortal +from catalyst.exchange.exchange_errors import ( + ExchangeRequestError, + ExchangeBarDataError +) + +log = Logger('DataPortalExchange') + + +class DataPortalExchange(DataPortal): + def __init__(self, exchange, *args, **kwargs): + self.exchange = exchange + + # TODO: put somewhere accessible by each algo + self.retry_get_history_window = 5 + self.retry_get_spot_value = 5 + self.retry_delay = 5 + + super(DataPortalExchange, self).__init__(*args, **kwargs) + + def _get_history_window(self, + assets, + end_dt, + bar_count, + frequency, + field, + data_frequency, + ffill=True, + attempt_index=0): + try: + return self.exchange.get_history_window( + assets, + end_dt, + bar_count, + frequency, + field, + data_frequency, + ffill) + except ExchangeRequestError as e: + log.warn( + 'get history attempt {}: {}'.format(attempt_index, e) + ) + if attempt_index < self.retry_get_history_window: + sleep(self.retry_delay) + return self._get_history_window(assets, + end_dt, + bar_count, + frequency, + field, + data_frequency, + ffill, + attempt_index + 1) + else: + raise ExchangeBarDataError( + data_type='history', + attempts=attempt_index, + error=e + ) + + def get_history_window(self, + assets, + end_dt, + bar_count, + frequency, + field, + data_frequency, + ffill=True): + return self._get_history_window(assets, + end_dt, + bar_count, + frequency, + field, + data_frequency, + ffill) + + def _get_spot_value(self, assets, field, dt, data_frequency, + attempt_index=0): + try: + return self.exchange.get_spot_value(assets, field, dt, + data_frequency) + except ExchangeRequestError as e: + log.warn( + 'get spot value attempt {}: {}'.format(attempt_index, e) + ) + if attempt_index < self.retry_get_spot_value: + sleep(self.retry_delay) + return self._get_spot_value(assets, field, dt, data_frequency, + attempt_index + 1) + else: + raise ExchangeBarDataError( + data_type='spot', + attempts=attempt_index, + error=e + ) + + def get_spot_value(self, assets, field, dt, data_frequency): + return self._get_spot_value(assets, field, dt, data_frequency) + + def get_adjusted_value(self, asset, field, dt, + perspective_dt, + data_frequency, + spot_value=None): + # TODO: does this pertain to cryptocurrencies? + raise NotImplementedError("get_adjusted_value is not implemented yet!") diff --git a/catalyst/exchange/exchange.py b/catalyst/exchange/exchange.py new file mode 100644 index 00000000..649b45a3 --- /dev/null +++ b/catalyst/exchange/exchange.py @@ -0,0 +1,489 @@ +import abc +import random +from time import sleep +import collections +from abc import ABCMeta, abstractmethod, abstractproperty +from datetime import timedelta + +import numpy as np +import pandas as pd +from catalyst.assets._assets import Asset +from logbook import Logger + +from catalyst.data.data_portal import BASE_FIELDS +from catalyst.errors import ( + SymbolNotFound, +) +from catalyst.finance.order import ORDER_STATUS +from catalyst.finance.transaction import Transaction +from catalyst.exchange.exchange_utils import get_exchange_symbols + +log = Logger('Exchange') + + +class Exchange: + __metaclass__ = ABCMeta + + def __init__(self): + self.name = None + self.trading_pairs = None + self.assets = {} + self._portfolio = None + self.minute_writer = None + self.minute_reader = None + + @abstractmethod + def subscribe_to_market_data(self, symbol): + pass + + @abstractproperty + def positions(self): + pass + + @abstractproperty + def update_portfolio(self): + pass + + @abstractproperty + def portfolio(self): + pass + + @abstractproperty + def account(self): + pass + + @abstractproperty + def time_skew(self): + pass + + def get_symbol(self, asset): + """ + Get the exchange specific symbol of the given asset. + + :param asset: Asset + :return: symbol: str + """ + symbol = None + + for key in self.assets: + if not symbol and self.assets[key].symbol == asset.symbol: + symbol = key + + if not symbol: + raise ValueError('Currency %s not supported by exchange %s' % + (asset['symbol'], self.name)) + + return symbol + + def get_symbols(self, assets): + """ + Get a list of symbols corresponding to each given asset. + + :param assets: Asset[] + :return: + """ + symbols = [] + + for asset in assets: + symbols.append(self.get_symbol(asset)) + + return symbols + + def get_asset(self, symbol): + """ + Find an Asset on the current exchange based on its Catalyst symbol + :param symbol: the [target]_[base] currency pair symbol + :return: Asset + """ + asset = None + + for key in self.assets: + if not asset and self.assets[key].symbol.lower() == symbol.lower(): + asset = self.assets[key] + + if not asset: + raise SymbolNotFound('Asset not found: %s' % symbol) + + return asset + + def load_assets(self): + """ + Populate the 'assets' attribute with a dictionary of Assets. + The key of the resulting dictionary is the exchange specific + currency pair symbol. The universal symbol is contained in the + 'symbol' attribute of each asset. + + + Notes + ----- + The sid of each asset is calculated based on a numeric hash of the + universal symbol. This simple approach avoids maintaining a mapping + of sids. + + This method can be overridden if an exchange offers equivalent data + via its api. + """ + + symbol_map = get_exchange_symbols(self.name) + for exchange_symbol in symbol_map: + asset = symbol_map[exchange_symbol] + symbol = asset['symbol'] + asset_name = ' / '.join(symbol.split('_')).upper() + + asset_obj = Asset( + symbol=symbol, + asset_name=asset_name, + sid=abs(hash(symbol)) % (10 ** 4), + exchange=self.name, + start_date=pd.to_datetime(asset['start_date'], utc=True), + end_date=pd.Timestamp.utcnow() + timedelta(minutes=300000), + ) + + self.assets[exchange_symbol] = asset_obj + + def check_open_orders(self): + """ + Loop through the list of open orders in the Portfolio object. + For each executed order found, create a transaction and apply to the + Portfolio. + + :return: + transactions: Transaction[] + """ + transactions = list() + if self.portfolio.open_orders: + for order_id in list(self.portfolio.open_orders): + log.debug('found open order: {}'.format(order_id)) + + order, executed_price = self.get_order(order_id) + log.debug('got updated order {} {}'.format( + order, executed_price)) + + if order.status == ORDER_STATUS.FILLED: + transaction = Transaction( + asset=order.asset, + amount=order.amount, + dt=pd.Timestamp.utcnow(), + price=executed_price, + order_id=order.id, + commission=order.commission + ) + transactions.append(transaction) + + self.portfolio.execute_order(order, transaction) + + elif order.status == ORDER_STATUS.CANCELLED: + self.portfolio.remove_order(order) + + else: + delta = pd.Timestamp.utcnow() - order.dt + log.info( + 'order {order_id} still open after {delta}'.format( + order_id=order_id, + delta=delta + ) + ) + return transactions + + def get_spot_value(self, assets, field, dt=None, data_frequency='minute'): + """ + Public API method that returns a scalar value representing the value + of the desired asset's field at either the given dt. + + Parameters + ---------- + assets : Asset, ContinuousFuture, or iterable of same. + The asset or assets whose data is desired. + field : {'open', 'high', 'low', 'close', 'volume', + 'price', 'last_traded'} + The desired field of the asset. + dt : pd.Timestamp + The timestamp for the desired value. + data_frequency : str + The frequency of the data to query; i.e. whether the data is + 'daily' or 'minute' bars + + Returns + ------- + value : float, int, or pd.Timestamp + The spot value of ``field`` for ``asset`` The return type is based + on the ``field`` requested. If the field is one of 'open', 'high', + 'low', 'close', or 'price', the value will be a float. If the + ``field`` is 'volume' the value will be a int. If the ``field`` is + 'last_traded' the value will be a Timestamp. + + Bitfinex timeframes + ------------------- + Available values: '1m', '5m', '15m', '30m', '1h', '3h', '6h', '12h', + '1D', '7D', '14D', '1M' + """ + if field not in BASE_FIELDS: + raise KeyError('Invalid column: ' + str(field)) + + if isinstance(assets, collections.Iterable): + values = list() + for asset in assets: + value = self.get_single_spot_value( + asset, field, data_frequency) + values.append(value) + + return values + else: + return self.get_single_spot_value( + assets, field, data_frequency) + + def get_single_spot_value(self, asset, field, data_frequency): + """ + Similar to 'get_spot_value' but for a single asset + + Note + ---- + We're writing each minute bar to disk using zipline's machinery. + This is especially useful when running multiple algorithms + concurrently. By using local data when possible, we try to reaching + request limits on exchanges. + + :param asset: + :param field: + :param data_frequency: + :return value: The spot value of the given asset / field + """ + log.debug( + 'fetching spot value {field} for symbol {symbol}'.format( + symbol=asset.symbol, + field=field + ) + ) + + if field == 'price': + field = 'close' + + # Don't use a timezone here + dt = pd.Timestamp.utcnow().floor('1 min') + value = None + if self.minute_reader is not None: + try: + # Slight delay to minimize the chances that multiple algos + # might try to hit the cache at the exact same time. + sleep_time = random.uniform(0.5, 0.8) + sleep(sleep_time) + # TODO: This does not always! Why is that? Open an issue with zipline. + # See: https://github.com/zipline-live/zipline/issues/26 + value = self.minute_reader.get_value( + sid=asset.sid, + dt=dt, + field=field + ) + except Exception as e: + log.warn('minute data not found: {}'.format(e)) + + if value is None or np.isnan(value): + ohlc = self.get_candles(data_frequency, asset) + if field not in ohlc: + raise KeyError('Invalid column: %s' % field) + + if self.minute_writer is not None: + df = pd.DataFrame( + [ohlc], + index=pd.DatetimeIndex([dt]), + columns=['open', 'high', 'low', 'close', 'volume'] + ) + + try: + self.minute_writer.write_sid( + sid=asset.sid, + df=df + ) + log.debug('wrote minute data: {}'.format(dt)) + except Exception as e: + log.warn( + 'unable to write minute data: {} {}'.format(dt, e)) + + value = ohlc[field] + log.debug('got spot value: {}'.format(value)) + else: + log.debug('got spot value from cache: {}'.format(value)) + + return value + + def get_history_window(self, + assets, + end_dt, + bar_count, + frequency, + field, + data_frequency, + ffill=True): + + """ + Public API method that returns a dataframe containing the requested + history window. Data is fully adjusted. + + Parameters + ---------- + assets : list of catalyst.data.Asset objects + The assets whose data is desired. + + end_dt: not applicable to cryptocurrencies + + bar_count: int + The number of bars desired. + + frequency: string + "1d" or "1m" + + field: string + The desired field of the asset. + + data_frequency: string + The frequency of the data to query; i.e. whether the data is + 'daily' or 'minute' bars. + + # TODO: fill how? + ffill: boolean + Forward-fill missing values. Only has effect if field + is 'price'. + + Returns + ------- + A dataframe containing the requested data. + """ + + candles = self.get_candles( + data_frequency=frequency, + assets=assets, + bar_count=bar_count, + ) + + frames = [] + for asset in assets: + asset_candles = candles[asset] + + asset_data = dict() + asset_data[asset] = map(lambda candle: candle[field], + asset_candles) + + dates = map(lambda candle: candle['last_traded'], + asset_candles) + + df = pd.DataFrame(asset_data, index=dates) + frames.append(df) + + return pd.concat(frames) + + @abstractmethod + def order(self, asset, amount, limit_price, stop_price, style): + """Place an order. + + Parameters + ---------- + asset : Asset + The asset that this order is for. + amount : int + The amount of shares to order. If ``amount`` is positive, this is + the number of shares to buy or cover. If ``amount`` is negative, + this is the number of shares to sell or short. + limit_price : float, optional + The limit price for the order. + stop_price : float, optional + The stop price for the order. + style : ExecutionStyle, optional + The execution style for the order. + + Returns + ------- + order_id : str or None + The unique identifier for this order, or None if no order was + placed. + + Notes + ----- + The ``limit_price`` and ``stop_price`` arguments provide shorthands for + passing common execution styles. Passing ``limit_price=N`` is + equivalent to ``style=LimitOrder(N)``. Similarly, passing + ``stop_price=M`` is equivalent to ``style=StopOrder(M)``, and passing + ``limit_price=N`` and ``stop_price=M`` is equivalent to + ``style=StopLimitOrder(N, M)``. It is an error to pass both a ``style`` + and ``limit_price`` or ``stop_price``. + + See Also + -------- + :class:`catalyst.finance.execution.ExecutionStyle` + :func:`catalyst.api.order_value` + :func:`catalyst.api.order_percent` + """ + pass + + @abstractmethod + def get_open_orders(self, asset): + """Retrieve all of the current open orders. + + Parameters + ---------- + asset : Asset + If passed and not None, return only the open orders for the given + asset instead of all open orders. + + Returns + ------- + open_orders : dict[list[Order]] or list[Order] + If no asset is passed this will return a dict mapping Assets + to a list containing all the open orders for the asset. + If an asset is passed then this will return a list of the open + orders for this asset. + """ + pass + + @abstractmethod + def get_order(self, order_id): + """Lookup an order based on the order id returned from one of the + order functions. + + Parameters + ---------- + order_id : str + The unique identifier for the order. + + Returns + ------- + order : Order + The order object. + execution_price: float + The execution price per share of the order + """ + pass + + @abstractmethod + def cancel_order(self, order_param): + """Cancel an open order. + + Parameters + ---------- + order_param : str or Order + The order_id or order object to cancel. + """ + pass + + @abstractmethod + def get_candles(self, data_frequency, assets, bar_count=None): + """ + Retrieve OHLCV candles for the given assets + + :param data_frequency: + :param assets: + :param end_dt: + :param bar_count: + :param limit: + :return: + """ + pass + + @abc.abstractmethod + def tickers(self, assets): + """ + Retrieve current tick data for the given assets + + :param assets: + :return: + """ + return diff --git a/catalyst/exchange/exchange_clock.py b/catalyst/exchange/exchange_clock.py new file mode 100644 index 00000000..4e180816 --- /dev/null +++ b/catalyst/exchange/exchange_clock.py @@ -0,0 +1,60 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from time import sleep + +import pandas as pd +from catalyst.gens.sim_engine import ( + BAR, + SESSION_START, + MINUTE_END, + SESSION_END +) +from logbook import Logger + +log = Logger('ExchangeClock') + + +class ExchangeClock(object): + """Realtime clock for live trading. + + This class is a drop-in replacement for + :class:`zipline.gens.sim_engine.MinuteSimulationClock`. + + This is a stripped down version because crypto exchanges run around the clock. + + The :param:`time_skew` parameter represents the time difference between + the Broker and the live trading machine's clock. + """ + + def __init__(self, sessions, time_skew=pd.Timedelta("0s")): + + self.sessions = sessions + self.time_skew = time_skew + self._last_emit = None + self._before_trading_start_bar_yielded = True + + def __iter__(self): + yield pd.Timestamp.utcnow(), SESSION_START + + while True: + current_time = pd.Timestamp.utcnow() + current_minute = current_time.floor('1 min') + + if self._last_emit is None or current_minute > self._last_emit: + log.debug('emitting minutely bar: {}'.format(current_minute)) + + self._last_emit = current_minute + yield current_minute, BAR + else: + sleep(1) diff --git a/catalyst/exchange/exchange_errors.py b/catalyst/exchange/exchange_errors.py new file mode 100644 index 00000000..14b06111 --- /dev/null +++ b/catalyst/exchange/exchange_errors.py @@ -0,0 +1,60 @@ +from catalyst.errors import ZiplineError + + +class ExchangeRequestError(ZiplineError): + msg = ( + 'Request failed: {error}' + ).strip() + + +class ExchangeRequestErrorTooManyAttempts(ZiplineError): + msg = ( + 'Request failed: {error}, giving up after {attempts} attempts' + ).strip() + + +class ExchangeBarDataError(ZiplineError): + msg = ( + 'Unable to retrieve bar data: {data_type}, ' + + 'giving up after {attempts} attempts: {error}' + ).strip() + + +class ExchangePortfolioDataError(ZiplineError): + msg = ( + 'Unable to retrieve portfolio data: {data_type}, ' + + 'giving up after {attempts} attempts: {error}' + ).strip() + + +class ExchangeTransactionError(ZiplineError): + msg = ( + 'Unable to execute transaction: {transaction_type}, ' + + 'giving up after {attempts} attempts: {error}' + ).strip() + + +class ExchangeAuthNotFound(ZiplineError): + msg = ( + 'Please create an auth.json file containing the api token and key for ' + 'exchange {exchange}. Place the file here: {filename}' + ).strip() + + +class ExchangeSymbolsNotFound(ZiplineError): + msg = ( + 'Unable to download or find a local copy of symbols.json for exchange ' + '{exchange}. The file should be here: {filename}' + ).strip() + + +class AlgoPickleNotFound(ZiplineError): + msg = ( + 'Pickle not found for algo {algo} in path {filename}' + ).strip() + + +class InvalidHistoryFrequencyError(ZiplineError): + msg = ( + 'History frequency {frequency} not supported by the exchange.' + ).strip() diff --git a/catalyst/exchange/exchange_portfolio.py b/catalyst/exchange/exchange_portfolio.py new file mode 100644 index 00000000..ded8a2a4 --- /dev/null +++ b/catalyst/exchange/exchange_portfolio.py @@ -0,0 +1,87 @@ +import numpy as np +from logbook import Logger + +from catalyst.protocol import Portfolio, Positions, Position + +log = Logger('ExchangePortfolio') + + +class ExchangePortfolio(Portfolio): + """ + Since the goal is to support multiple exchanges, it makes sense to + include additional stats in the portfolio object. + + Instead of relying on the performance tracker, each exchange portfolio + tracks its own holding. This offers a separation between tracking an + exchange and the statistics of the algorithm. + """ + + def __init__(self, start_date, starting_cash=None): + self.capital_used = 0.0 + self.starting_cash = starting_cash + self.portfolio_value = starting_cash + self.pnl = 0.0 + self.returns = 0.0 + self.cash = starting_cash + self.positions = Positions() + self.start_date = start_date + self.positions_value = 0.0 + self.open_orders = dict() + + def calculate_pnl(self): + log.debug('calculating pnl') + + def create_order(self, order): + log.debug('creating order {}'.format(order.id)) + self.open_orders[order.id] = order + + order_position = self.positions[order.asset] \ + if order.asset in self.positions else None + + if order_position is None: + order_position = Position(order.asset) + self.positions[order.asset] = order_position + + order_position.amount += order.amount + log.debug('open order added to portfolio') + + def execute_order(self, order, transaction): + log.debug('executing order {}'.format(order.id)) + del self.open_orders[order.id] + + order_position = self.positions[order.asset] \ + if order.asset in self.positions else None + + if order_position is None: + raise ValueError( + 'Trying to execute order for a position not held: %s' % order.id + ) + + self.capital_used += order.amount * transaction.price + + if order.amount > 0: + if order_position.cost_basis > 0: + order_position.cost_basis = np.average( + [order_position.cost_basis, transaction.price], + weights=[order_position.amount, order.amount] + ) + else: + order_position.cost_basis = transaction.price + + log.debug('updated portfolio with executed order') + + def remove_order(self, order): + log.info('removing cancelled order {}'.format(order.id)) + del self.open_orders[order.id] + + order_position = self.positions[order.asset] \ + if order.asset in self.positions else None + + if order_position is None: + raise ValueError( + 'Trying to remove order for a position not held: %s' % order.id + ) + + order_position.amount -= order.amount + + log.debug('removed order from portfolio') diff --git a/catalyst/exchange/exchange_utils.py b/catalyst/exchange/exchange_utils.py new file mode 100644 index 00000000..40b8f32f --- /dev/null +++ b/catalyst/exchange/exchange_utils.py @@ -0,0 +1,133 @@ +import json +import os +import pickle +import urllib +from datetime import date, datetime + +from catalyst.exchange.exchange_errors import ExchangeAuthNotFound, \ + ExchangeSymbolsNotFound +from catalyst.utils.paths import data_root, ensure_directory + +SYMBOLS_URL = 'https://raw.githubusercontent.com/enigmampc/catalyst/' \ + 'live-trading/catalyst/exchange/symbols/{exchange}.json' + + +def get_exchange_folder(exchange_name, environ=None): + if not environ: + environ = os.environ + + root = data_root(environ) + exchange_folder = os.path.join(root, 'exchanges', exchange_name) + ensure_directory(exchange_folder) + + return exchange_folder + + +def download_exchange_symbols(exchange_name, environ=None): + exchange_folder = get_exchange_folder(exchange_name, environ) + filename = os.path.join(exchange_folder, 'symbols.json') + + url = SYMBOLS_URL.format(exchange=exchange_name) + response = urllib.urlretrieve(url=url, filename=filename) + return response + + +def get_exchange_symbols(exchange_name, environ=None): + exchange_folder = get_exchange_folder(exchange_name, environ) + filename = os.path.join(exchange_folder, 'symbols.json') + + if not os.path.isfile(filename): + download_exchange_symbols(exchange_name, environ) + + if os.path.isfile(filename): + with open(filename) as data_file: + data = json.load(data_file) + return data + else: + raise ExchangeSymbolsNotFound( + exchange=exchange_name, + filename=filename + ) + + +def get_exchange_auth(exchange_name, environ=None): + exchange_folder = get_exchange_folder(exchange_name, environ) + filename = os.path.join(exchange_folder, 'auth.json') + + if os.path.isfile(filename): + with open(filename) as data_file: + data = json.load(data_file) + return data + else: + raise ExchangeAuthNotFound( + exchange=exchange_name, + filename=filename + ) + + +def get_algo_folder(algo_name, environ=None): + if not environ: + environ = os.environ + + root = data_root(environ) + algo_folder = os.path.join(root, 'live_algos', algo_name) + ensure_directory(algo_folder) + + return algo_folder + + +def get_algo_object(algo_name, key, environ=None, rel_path=None): + folder = get_algo_folder(algo_name, environ) + + if rel_path is not None: + folder = os.path.join(folder, rel_path) + + filename = os.path.join(folder, key + '.p') + + if os.path.isfile(filename): + try: + with open(filename, 'rb') as handle: + return pickle.load(handle) + except Exception as e: + return None + else: + return None + + +def save_algo_object(algo_name, key, obj, environ=None, rel_path=None): + folder = get_algo_folder(algo_name, environ) + + if rel_path is not None: + folder = os.path.join(folder, rel_path) + ensure_directory(folder) + + filename = os.path.join(folder, key + '.p') + + with open(filename, 'wb') as handle: + pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL) + + +def append_algo_object(algo_name, key, obj, environ=None): + algo_folder = get_algo_folder(algo_name, environ) + filename = os.path.join(algo_folder, key + '.p') + + mode = 'a+b' if os.path.isfile(filename) else 'wb' + with open(filename, mode) as handle: + pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL) + + +def get_exchange_minute_writer_root(exchange_name, environ=None): + exchange_folder = get_exchange_folder(exchange_name, environ) + + minute_data_folder = os.path.join(exchange_folder, 'minute_data') + ensure_directory(minute_data_folder) + + return minute_data_folder + + +def perf_serial(obj): + """JSON serializer for objects not serializable by default json code""" + + if isinstance(obj, (datetime, date)): + return obj.isoformat() + raise TypeError("Type %s not serializable" % type(obj))