Creating a clean branch for live trading

This commit is contained in:
fredfortier
2017-08-27 15:19:13 -04:00
parent 3a44a3cc1f
commit b8d442cf89
15 changed files with 2394 additions and 0 deletions
+4
View File
@@ -78,3 +78,7 @@ zipline.iml
./data
TAGS
python2
python3
scratch
+155
View File
@@ -0,0 +1,155 @@
import talib
from logbook import Logger
from catalyst.api import (
order,
order_target_percent,
symbol,
record,
get_open_orders,
)
from catalyst.utils.run_algo import run_algorithm
algo_namespace = 'buy_the_dip_live'
log = Logger(algo_namespace)
def initialize(context):
log.info('initializing algo')
context.ASSET_NAME = 'XRP_USD'
context.asset = symbol(context.ASSET_NAME)
context.TARGET_POSITIONS = 5000
context.PROFIT_TARGET = 0.1
context.SLIPPAGE_ALLOWED = 0.02
context.retry_check_open_orders = 10
context.retry_update_portfolio = 10
context.retry_order = 5
context.errors = []
pass
def _handle_data(context, data):
prices = data.history(
context.asset,
fields='price',
bar_count=20,
frequency='15m'
)
rsi = talib.RSI(prices.values, timeperiod=14)[-1]
log.info('got rsi: {}'.format(rsi))
# Buying more when RSI is low, this should lower our cost basis
if rsi <= 30:
buy_increment = 50
elif rsi <= 40:
buy_increment = 20
elif rsi <= 70:
buy_increment = 5
else:
buy_increment = None
cash = context.portfolio.cash
log.info('base currency available: {cash}'.format(cash=cash))
price = data.current(context.asset, 'price')
log.info('got price {price}'.format(price=price))
record(
price=price,
rsi=rsi,
)
orders = get_open_orders(context.asset)
if orders:
log.info('skipping bar until all open orders execute')
return
is_buy = False
cost_basis = None
if context.asset in context.portfolio.positions:
position = context.portfolio.positions[context.asset]
cost_basis = position.cost_basis
log.info(
'found {amount} positions with cost basis {cost_basis}'.format(
amount=position.amount,
cost_basis=cost_basis
)
)
if position.amount >= context.TARGET_POSITIONS:
log.info('reached positions target: {}'.format(position.amount))
return
if price < cost_basis:
is_buy = True
elif position.amount > 0 and \
price > cost_basis * (1 + context.PROFIT_TARGET):
profit = (price * position.amount) - (cost_basis * position.amount)
log.info('closing position, taking profit: {}'.format(profit))
order_target_percent(
asset=context.asset,
target=0,
limit_price=price * (1 - context.SLIPPAGE_ALLOWED),
)
else:
log.info('no buy or sell opportunity found')
else:
is_buy = True
if is_buy:
if buy_increment is None:
log.info('the rsi is too high to consider buying {}'.format(rsi))
return
if price * buy_increment > cash:
log.info('not enough base currency to consider buying')
return
log.info(
'buying position cheaper than cost basis {} < {}'.format(
price,
cost_basis
)
)
order(
asset=context.asset,
amount=buy_increment,
limit_price=price * (1 + context.SLIPPAGE_ALLOWED)
)
def handle_data(context, data):
log.info('handling bar {}'.format(data.current_dt))
# try:
_handle_data(context, data)
# except Exception as e:
# log.warn('aborting the bar on error {}'.format(e))
# context.errors.append(e)
log.info('completed bar {}, total execution errors {}'.format(
data.current_dt,
len(context.errors)
))
if len(context.errors) > 0:
log.info('the errors:\n{}'.format(context.errors))
def analyze(context, stats):
log.info('the full stats:\n{}'.format(stats.head()))
pass
run_algorithm(
initialize=initialize,
handle_data=handle_data,
analyze=analyze,
exchange_name='bitfinex',
live=True,
algo_namespace=algo_namespace,
base_currency='usd'
)
View File
+437
View File
@@ -0,0 +1,437 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import signal
import sys
import pickle
from datetime import timedelta
from time import sleep
from os import listdir
from os.path import isfile, join
import logbook
import pandas as pd
import catalyst.protocol as zp
from catalyst.algorithm import TradingAlgorithm
from catalyst.data.minute_bars import BcolzMinuteBarWriter, \
BcolzMinuteBarReader
from catalyst.errors import OrderInBeforeTradingStart
from catalyst.exchange.exchange_clock import ExchangeClock
from catalyst.exchange.exchange_errors import (
ExchangeRequestError,
ExchangePortfolioDataError,
ExchangeTransactionError
)
from catalyst.exchange.exchange_utils import get_exchange_minute_writer_root, \
save_algo_object, get_algo_object, get_algo_folder
from catalyst.finance.performance.period import calc_period_stats
from catalyst.gens.tradesimulation import AlgorithmSimulator
from catalyst.utils.api_support import (
api_method,
disallowed_in_before_trading_start)
from catalyst.utils.input_validation import error_keywords
log = logbook.Logger("ExchangeTradingAlgorithm")
class ExchangeAlgorithmExecutor(AlgorithmSimulator):
def __init__(self, *args, **kwargs):
super(self.__class__, self).__init__(*args, **kwargs)
class ExchangeTradingAlgorithm(TradingAlgorithm):
def __init__(self, *args, **kwargs):
self.exchange = kwargs.pop('exchange', None)
self.algo_namespace = kwargs.pop('algo_namespace', None)
self.orders = {}
self.is_running = True
self.retry_check_open_orders = 5
self.retry_update_portfolio = 5
self.retry_get_open_orders = 5
self.retry_order = 2
self.retry_delay = 5
super(self.__class__, self).__init__(*args, **kwargs)
self._create_minute_writer()
signal.signal(signal.SIGINT, self.signal_handler)
log.info('exchange trading algorithm successfully initialized')
def _create_minute_writer(self):
root = get_exchange_minute_writer_root(self.exchange.name)
filename = os.path.join(root, 'metadata.json')
if os.path.isfile(filename):
writer = BcolzMinuteBarWriter.open(
root, self.sim_params.end_session)
else:
writer = BcolzMinuteBarWriter(
rootdir=root,
calendar=self.trading_calendar,
minutes_per_day=1440,
start_session=self.sim_params.start_session,
end_session=self.sim_params.end_session,
write_metadata=True
)
self.exchange.minute_writer = writer
self.exchange.minute_reader = BcolzMinuteBarReader(root)
def signal_handler(self, signal, frame):
self.is_running = False
log.info('You pressed Ctrl+C!')
stats = None
try:
algo_folder = get_algo_folder(self.algo_namespace)
folder = join(algo_folder, 'daily_perf')
files = [f for f in listdir(folder) if isfile(join(folder, f))]
daily_perf_list = []
for item in files:
filename = join(folder, item)
with open(filename, 'rb') as handle:
daily_perf_list.append(pickle.load(handle))
stats = pd.DataFrame(daily_perf_list)
stats.set_index('period_close', drop=True, inplace=True)
except Exception as e:
log.warn('Unable to compute daily stats: {}'.format(e))
self.analyze(stats)
sys.exit(0)
def _create_clock(self):
# The calendar's execution times are the minutes over which we actually
# want to run the clock. Typically the execution times simply adhere to
# the market open and close times. In the case of the futures calendar,
# for example, we only want to simulate over a subset of the full 24
# hour calendar, so the execution times dictate a market open time of
# 6:31am US/Eastern and a close of 5:00pm US/Eastern.
# In our case, we are trading around the clock, so the market close
# corresponds to the last minute of the day.
# This method is taken from TradingAlgorithm.
# The clock has been replaced to use RealtimeClock
# TODO: should we apply a time skew? not sure to understand the utility.
return ExchangeClock(
self.sim_params.sessions,
time_skew=self.exchange.time_skew
)
def _create_generator(self, sim_params):
if self.perf_tracker is None:
self.perf_tracker = get_algo_object(
algo_name=self.algo_namespace,
key='perf_tracker'
)
# Call the simulation trading algorithm for side-effects:
# it creates the perf tracker
TradingAlgorithm._create_generator(self, sim_params)
self.trading_client = ExchangeAlgorithmExecutor(
self,
sim_params,
self.data_portal,
self._create_clock(),
self._create_benchmark_source(),
self.restrictions,
universe_func=self._calculate_universe
)
return self.trading_client.transform()
def updated_portfolio(self):
"""
We skip the entire performance tracker business and update the
portfolio directly.
:return:
"""
return self.exchange.portfolio
def updated_account(self):
return self.exchange.account
def _update_portfolio(self, attempt_index=0):
try:
self.exchange.update_portfolio()
# Applying the updated last_sales_price to the positions
# in the performance tracker. This seems a bit redundant
# but it will make sense when we have multiple exchange portfolios
# feeding into the same performance tracker.
tracker = self.perf_tracker.todays_performance.position_tracker
for asset in self.exchange.portfolio.positions:
position = self.exchange.portfolio.positions[asset]
tracker.update_position(
asset=asset,
last_sale_date=position.last_sale_date,
last_sale_price=position.last_sale_price
)
except ExchangeRequestError as e:
log.warn(
'update portfolio attempt {}: {}'.format(attempt_index, e)
)
if attempt_index < self.retry_update_portfolio:
sleep(self.retry_delay)
self._update_portfolio(attempt_index + 1)
else:
raise ExchangePortfolioDataError(
data_type='update-portfolio',
attempts=attempt_index,
error=e
)
def _check_open_orders(self, attempt_index=0):
try:
return self.exchange.check_open_orders()
except ExchangeRequestError as e:
log.warn(
'check open orders attempt {}: {}'.format(attempt_index, e)
)
if attempt_index < self.retry_check_open_orders:
sleep(self.retry_delay)
return self._check_open_orders(attempt_index + 1)
else:
raise ExchangePortfolioDataError(
data_type='order-status',
attempts=attempt_index,
error=e
)
def prepare_period_stats(self, start_dt, end_dt):
"""
Creates a dictionary representing the state of the tracker.
I rewrote this in an attempt to better control the stats.
I don't want things to happen magically through complex logic
pertaining to backtesting.
"""
tracker = self.perf_tracker
period = tracker.todays_performance
pos_stats = period.position_tracker.stats()
period_stats = calc_period_stats(pos_stats, period.ending_cash)
stats = dict(
period_start=tracker.period_start,
period_end=tracker.period_end,
capital_base=tracker.capital_base,
progress=tracker.progress,
ending_value=period.ending_value,
ending_exposure=period.ending_exposure,
capital_used=period.cash_flow,
starting_value=period.starting_value,
starting_exposure=period.starting_exposure,
starting_cash=period.starting_cash,
ending_cash=period.ending_cash,
portfolio_value=period.ending_cash + period.ending_value,
pnl=period.pnl,
returns=period.returns,
period_open=period.period_open,
period_close=period.period_close,
gross_leverage=period_stats.gross_leverage,
net_leverage=period_stats.net_leverage,
short_exposure=pos_stats.short_exposure,
long_exposure=pos_stats.long_exposure,
short_value=pos_stats.short_value,
long_value=pos_stats.long_value,
longs_count=pos_stats.longs_count,
shorts_count=pos_stats.shorts_count,
)
# Merging cumulative risk
stats.update(tracker.cumulative_risk_metrics.to_dict())
# Merging latest recorded variables
stats.update(self.recorded_vars)
stats['positions'] = period.position_tracker.get_positions_list()
# we want the key to be absent, not just empty
# Only include transactions for given dt
stats['transactions'] = dict()
for date in period.processed_transactions:
if start_dt <= date < end_dt:
stats['transactions'][date] = \
period.processed_transactions[date]
stats['orders'] = dict()
for date in period.orders_by_modified:
if start_dt <= date < end_dt:
stats['orders'][date] = \
period.orders_by_modified[date]
return stats
def handle_data(self, data):
if not self.is_running:
return
self._update_portfolio()
transactions = self._check_open_orders()
for transaction in transactions:
self.perf_tracker.process_transaction(transaction)
if self._handle_data:
self._handle_data(self, data)
# Unlike trading controls which remain constant unless placing an
# order, account controls can change each bar. Thus, must check
# every bar no matter if the algorithm places an order or not.
self.validate_account_controls()
try:
# Since the clock runs 24/7, I trying to disable the daily
# Performance tracker and keep only minute and cumulative
self.perf_tracker.update_performance()
# TODO: save for future use?
minute_stats = self.prepare_period_stats(
data.current_dt, data.current_dt + timedelta(minutes=1))
log.debug('the minute performance:\n{}'.format(minute_stats))
today = pd.to_datetime('today', utc=True)
daily_stats = self.prepare_period_stats(
start_dt=today,
end_dt=pd.Timestamp.utcnow()
)
save_algo_object(
algo_name=self.algo_namespace,
key=today.strftime('%Y-%m-%d'),
obj=daily_stats,
rel_path='daily_perf'
)
except Exception as e:
log.warn('unable to calculate performance: {}'.format(e))
try:
save_algo_object(
algo_name=self.algo_namespace,
key='perf_tracker',
obj=self.perf_tracker
)
except Exception as e:
log.warn('unable to save minute perfs to disk: {}'.format(e))
try:
save_algo_object(
algo_name=self.algo_namespace,
key='portfolio_{}'.format(self.exchange.name),
obj=self.exchange.portfolio
)
except Exception as e:
log.warn('unable to save portfolio to disk: {}'.format(e))
def _order(self,
asset,
amount,
limit_price=None,
stop_price=None,
style=None,
attempt_index=0):
try:
return self.exchange.order(asset, amount, limit_price,
stop_price,
style)
except ExchangeRequestError as e:
log.warn(
'order attempt {}: {}'.format(attempt_index, e)
)
if attempt_index < self.retry_order:
sleep(self.retry_delay)
return self._order(
asset, amount, limit_price, stop_price, style,
attempt_index + 1)
else:
raise ExchangeTransactionError(
transaction_type='order',
attempts=attempt_index,
error=e
)
@api_method
@disallowed_in_before_trading_start(OrderInBeforeTradingStart())
def order(self,
asset,
amount,
limit_price=None,
stop_price=None,
style=None):
amount, style = self._calculate_order(asset, amount,
limit_price, stop_price,
style)
order_id = self._order(asset, amount, limit_price, stop_price, style)
order = self.portfolio.open_orders[order_id]
self.perf_tracker.process_order(order)
return order
def round_order(self, amount):
"""
We need fractions with cryptocurrencies
:param amount:
:return:
"""
return amount
@api_method
def batch_market_order(self, share_counts):
raise NotImplementedError()
def _get_open_orders(self, asset=None, attempt_index=0):
try:
return self.exchange.get_open_orders(asset)
except ExchangeRequestError as e:
log.warn(
'open orders attempt {}: {}'.format(attempt_index, e)
)
if attempt_index < self.retry_get_open_orders:
sleep(self.retry_delay)
return self._get_open_orders(asset, attempt_index + 1)
else:
raise ExchangePortfolioDataError(
data_type='open-orders',
attempts=attempt_index,
error=e
)
@error_keywords(sid='Keyword argument `sid` is no longer supported for '
'get_open_orders. Use `asset` instead.')
@api_method
def get_open_orders(self, asset=None):
return self._get_open_orders(asset)
@api_method
def get_order(self, order_id):
return self.exchange.get_order(order_id)
@api_method
def cancel_order(self, order_param):
order_id = order_param
if isinstance(order_param, zp.Order):
order_id = order_param.id
self.exchange.cancel_order(order_id)
@@ -0,0 +1,91 @@
from logbook import Logger
log = Logger('AssetFinderExchange')
class AssetFinderExchange(object):
def __init__(self, exchange):
self.exchange = exchange
self._asset_cache = {}
@property
def sids(self):
"""
This seems to be used to pre-fetch assets.
I don't think that we need this for live-trading.
Leaving the list empty.
"""
return list()
def retrieve_all(self, sids, default_none=False):
"""
Retrieve all assets in `sids`.
Parameters
----------
sids : iterable of int
Assets to retrieve.
default_none : bool
If True, return None for failed lookups.
If False, raise `SidsNotFound`.
Returns
-------
assets : list[Asset or None]
A list of the same length as `sids` containing Assets (or Nones)
corresponding to the requested sids.
Raises
------
SidsNotFound
When a requested sid is not found and default_none=False.
"""
for sid in sids:
if sid in self._asset_cache:
log.info('got asset from cache: {}'.format(sid))
else:
log.info('fetching asset: {}'.format(sid))
return list()
def lookup_symbol(self, symbol, as_of_date, fuzzy=False):
"""Lookup an asset by symbol.
Parameters
----------
symbol : str
The ticker symbol to resolve.
as_of_date : datetime or None
Look up the last owner of this symbol as of this datetime.
If ``as_of_date`` is None, then this can only resolve the equity
if exactly one equity has ever owned the ticker.
fuzzy : bool, optional
Should fuzzy symbol matching be used? Fuzzy symbol matching
attempts to resolve differences in representations for
shareclasses. For example, some people may represent the ``A``
shareclass of ``BRK`` as ``BRK.A``, where others could write
``BRK_A``.
Returns
-------
equity : Asset
The equity that held ``symbol`` on the given ``as_of_date``, or the
only equity to hold ``symbol`` if ``as_of_date`` is None.
Raises
------
SymbolNotFound
Raised when no equity has ever held the given symbol.
MultipleSymbolsFound
Raised when no ``as_of_date`` is given and more than one equity
has held ``symbol``. This is also raised when ``fuzzy=True`` and
there are multiple candidates for the given ``symbol`` on the
``as_of_date``.
"""
log.info('looking up symbol: {}'.format(symbol))
if symbol in self._asset_cache:
return self._asset_cache[symbol]
else:
asset = self.exchange.get_asset(symbol)
self._asset_cache[symbol] = asset
return asset
+647
View File
@@ -0,0 +1,647 @@
import base64
import numpy as np
import hashlib
import hmac
import json
import re
import time
import pandas as pd
import pytz
import requests
import six
from catalyst.assets._assets import Asset
from logbook import Logger
# from websocket import create_connection
from catalyst.exchange.exchange import Exchange
from catalyst.exchange.exchange_errors import (
ExchangeRequestError,
InvalidHistoryFrequencyError
)
from catalyst.finance.execution import (MarketOrder,
LimitOrder,
StopOrder,
StopLimitOrder)
from catalyst.finance.order import Order, ORDER_STATUS
from catalyst.protocol import Account
# Trying to account for REST api instability
# https://stackoverflow.com/questions/15431044/can-i-set-max-retries-for-requests-request
requests.adapters.DEFAULT_RETRIES = 20
BITFINEX_URL = 'https://api.bitfinex.com'
log = Logger('Bitfinex')
warning_logger = Logger('AlgoWarning')
class Bitfinex(Exchange):
def __init__(self, key, secret, base_currency, portfolio=None):
self.url = BITFINEX_URL
self.key = key
self.secret = secret
self.id = 'b'
self.name = 'bitfinex'
self.assets = {}
self.load_assets()
self.base_currency = base_currency
self._portfolio = portfolio
self.minute_writer = None
self.minute_reader = None
def _request(self, operation, data, version='v1'):
payload_object = {
'request': '/{}/{}'.format(version, operation),
'nonce': '{0:f}'.format(time.time() * 1000000),
# convert to string
'options': {}
}
if data is None:
payload_dict = payload_object
else:
payload_dict = payload_object.copy()
payload_dict.update(data)
payload_json = json.dumps(payload_dict)
if six.PY3:
payload = base64.b64encode(bytes(payload_json, 'utf-8'))
else:
payload = base64.b64encode(payload_json)
m = hmac.new(self.secret, payload, hashlib.sha384)
m = m.hexdigest()
# headers
headers = {
'X-BFX-APIKEY': self.key,
'X-BFX-PAYLOAD': payload,
'X-BFX-SIGNATURE': m
}
if data is None:
request = requests.get(
'{url}/{version}/{operation}'.format(
url=self.url,
version=version,
operation=operation
), data={},
headers=headers)
else:
request = requests.post(
'{url}/{version}/{operation}'.format(
url=self.url,
version=version,
operation=operation
),
headers=headers)
return request
def _get_v2_symbol(self, asset):
pair = asset.symbol.split('_')
symbol = 't' + pair[0].upper() + pair[1].upper()
return symbol
def _get_v2_symbols(self, assets):
"""
Workaround to support Bitfinex v2
TODO: Might require a separate asset dictionary
:param assets:
:return:
"""
v2_symbols = []
for asset in assets:
v2_symbols.append(self._get_v2_symbol(asset))
return v2_symbols
def _create_order(self, order_status):
"""
Create a Catalyst order object from a Bitfinex order dictionary
:param order_status:
:return: Order
"""
if order_status['is_cancelled']:
status = ORDER_STATUS.CANCELLED
elif not order_status['is_live']:
log.info('found executed order {}'.format(order_status))
status = ORDER_STATUS.FILLED
else:
status = ORDER_STATUS.OPEN
amount = float(order_status['original_amount'])
filled = float(order_status['executed_amount'])
is_buy = (amount > 0)
price = float(order_status['price'])
order_type = order_status['type']
stop_price = None
limit_price = None
# TODO: is this comprehensive enough?
if order_type.endswith('limit'):
limit_price = price
elif order_type.endswith('stop'):
stop_price = price
executed_price = float(order_status['avg_execution_price'])
# TODO: bitfinex does not specify comission. I could calculate it but not sure if it's worth it.
commission = None
# TODO: zipline likes rounded dates to match statistics, is this ok?
date = pd.Timestamp.utcfromtimestamp(float(order_status['timestamp']))
date = pytz.utc.localize(date)
order = Order(
dt=date,
asset=self.assets[order_status['symbol']],
amount=amount,
stop=stop_price,
limit=limit_price,
filled=filled,
id=order_status['id'],
commission=commission
)
order.status = status
return order, executed_price
def update_portfolio(self):
"""
Update the portfolio cash and position balances based on the
latest ticker prices.
:return:
"""
try:
response = self._request('balances', None)
balances = response.json()
except Exception as e:
raise ExchangeRequestError(error=e)
if 'message' in balances:
raise ExchangeRequestError(
error='unable to fetch balance {}'.format(balances['message'])
)
base_position = None
for position in balances:
if not base_position and position['type'] == 'exchange' \
and position['currency'] == self.base_currency:
base_position = position
if position is None:
raise ValueError(
error='Base currency %s not found in portfolio' % self.base_currency
)
portfolio = self._portfolio
portfolio.cash = float(base_position['available'])
if portfolio.starting_cash is None:
portfolio.starting_cash = portfolio.cash
if portfolio.positions:
assets = portfolio.positions.keys()
tickers = self.tickers(assets)
portfolio.positions_value = 0.0
for ticker in tickers:
# TODO: convert if the position is not in the base currency
position = portfolio.positions[ticker['asset']]
position.last_sale_price = ticker['last_price']
position.last_sale_date = ticker['timestamp']
portfolio.positions_value += \
position.amount * position.last_sale_price
portfolio.portfolio_value = \
portfolio.positions_value + portfolio.cash
@property
def portfolio(self):
"""
Return the Portfolio
:return:
"""
# if self._portfolio is None:
# portfolio = ExchangePortfolio(
# start_date=pd.Timestamp.utcnow()
# )
# self.store.portfolio = portfolio
# self.update_portfolio()
#
# portfolio.starting_cash = portfolio.cash
# else:
# portfolio = self.store.portfolio
return self._portfolio
@property
def account(self):
account = Account()
account.settled_cash = None
account.accrued_interest = None
account.buying_power = None
account.equity_with_loan = None
account.total_positions_value = None
account.total_positions_exposure = None
account.regt_equity = None
account.regt_margin = None
account.initial_margin_requirement = None
account.maintenance_margin_requirement = None
account.available_funds = None
account.excess_liquidity = None
account.cushion = None
account.day_trades_remaining = None
account.leverage = None
account.net_leverage = None
account.net_liquidation = None
return account
@property
def positions(self):
return self.portfolio.positions
@property
def time_skew(self):
# TODO: research the time skew conditions
return pd.Timedelta('0s')
def subscribe_to_market_data(self, symbol):
pass
def get_candles(self, data_frequency, assets, bar_count=None):
"""
Retrieve OHLVC candles from Bitfinex
:param data_frequency:
:param assets:
:param bar_count:
:return:
Available Frequencies
---------------------
'1m', '5m', '15m', '30m', '1h', '3h', '6h', '12h', '1D', '7D', '14D',
'1M'
"""
# TODO: use BcolzMinuteBarReader to read from cache
freq_match = re.match(r'([0-9].*)(m|h|d)', data_frequency, re.M | re.I)
if freq_match:
number = int(freq_match.group(1))
unit = freq_match.group(2)
if unit == 'd':
converted_unit = 'D'
else:
converted_unit = unit
frequency = '{}{}'.format(number, converted_unit)
allowed_frequencies = ['1m', '5m', '15m', '30m', '1h', '3h', '6h',
'12h', '1D', '7D', '14D', '1M']
if frequency not in allowed_frequencies:
raise InvalidHistoryFrequencyError(
frequency=data_frequency
)
elif data_frequency == 'minute':
frequency = '1m'
elif data_frequency == 'daily':
frequency = '1D'
else:
raise InvalidHistoryFrequencyError(
frequency=data_frequency
)
# Making sure that assets are iterable
asset_list = [assets] if isinstance(assets, Asset) else assets
ohlc_list = dict()
for asset in asset_list:
symbol = self._get_v2_symbol(asset)
url = '{url}/v2/candles/trade:{frequency}:{symbol}'.format(
url=self.url,
frequency=frequency,
symbol=symbol
)
if bar_count:
is_list = True
url += '/hist?limit={}'.format(int(bar_count))
else:
is_list = False
url += '/last'
try:
response = requests.get(url)
except Exception as e:
raise ExchangeRequestError(error=e)
if 'error' in response.content:
raise ExchangeRequestError(
error='Unable to retrieve candles: {}'.format(
response.content)
)
candles = response.json()
def ohlc_from_candle(candle):
ohlc = dict(
open=np.float64(candle[1]),
high=np.float64(candle[3]),
low=np.float64(candle[4]),
close=np.float64(candle[2]),
volume=np.float64(candle[5]),
price=np.float64(candle[2]),
last_traded=pd.Timestamp.utcfromtimestamp(
candle[0] / 1000.0),
minute_dt=pd.Timestamp.utcnow().floor('1 min')
)
return ohlc
if is_list:
ohlc_bars = []
# We can to list candles from old to new
for candle in reversed(candles):
ohlc = ohlc_from_candle(candle)
ohlc_bars.append(ohlc)
ohlc_list[asset] = ohlc_bars
else:
ohlc = ohlc_from_candle(candles)
ohlc_list[asset] = ohlc
return ohlc_list[assets] \
if isinstance(assets, Asset) else ohlc_list
def order(self, asset, amount, limit_price, stop_price, style):
"""Place an order.
Parameters
----------
asset : Asset
The asset that this order is for.
amount : int
The amount of shares to order. If ``amount`` is positive, this is
the number of shares to buy or cover. If ``amount`` is negative,
this is the number of shares to sell or short.
limit_price : float, optional
The limit price for the order.
stop_price : float, optional
The stop price for the order.
style : ExecutionStyle, optional
The execution style for the order.
Returns
-------
order_id : str or None
The unique identifier for this order, or None if no order was
placed.
Notes
-----
The ``limit_price`` and ``stop_price`` arguments provide shorthands for
passing common execution styles. Passing ``limit_price=N`` is
equivalent to ``style=LimitOrder(N)``. Similarly, passing
``stop_price=M`` is equivalent to ``style=StopOrder(M)``, and passing
``limit_price=N`` and ``stop_price=M`` is equivalent to
``style=StopLimitOrder(N, M)``. It is an error to pass both a ``style``
and ``limit_price`` or ``stop_price``.
Bitfinex Order Types
--------------------
LIMIT, MARKET, STOP, TRAILING STOP,
EXCHANGE MARKET, EXCHANGE LIMIT, EXCHANGE STOP,
EXCHANGE TRAILING STOP, FOK, EXCHANGE FOK.
See Also
--------
:class:`catalyst.finance.execution.ExecutionStyle`
:func:`catalyst.api.order_value`
:func:`catalyst.api.order_percent`
"""
if amount == 0:
log.warn('skipping order amount of 0')
return None
base_currency = asset.symbol.split('_')[1]
if base_currency.lower() != self.base_currency.lower():
raise NotImplementedError(
'Currency pairs must share their base with the exchange.'
)
is_buy = (amount > 0)
if isinstance(style, MarketOrder):
order_type = 'market'
elif isinstance(style, LimitOrder):
order_type = 'limit'
price = limit_price
elif isinstance(style, StopOrder):
order_type = 'stop'
price = stop_price
elif isinstance(style, StopLimitOrder):
log.warn('using limit order instead of stop/limit')
# TODO: Not sure how to do this with the api. Investigate.
order_type = 'limit'
price = limit_price
else:
raise NotImplementedError('%s orders not available' % style)
log.debug(
'ordering {amount} {symbol} for {price}'.format(
amount=amount,
symbol=asset.symbol,
price=price
)
)
exchange_symbol = self.get_symbol(asset)
req = dict(
symbol=exchange_symbol,
amount=str(float(abs(amount))),
price=str(float(price)),
side='buy' if is_buy else 'sell',
type='exchange ' + order_type, # TODO: support margin trades
exchange=self.name,
is_hidden=False,
is_postonly=False,
use_all_available=0,
ocoorder=False,
buy_price_oco=0,
sell_price_oco=0
)
date = pd.Timestamp.utcnow()
try:
response = self._request('order/new', req)
exchange_order = response.json()
except Exception as e:
raise ExchangeRequestError(error=e)
if 'message' in exchange_order:
raise ExchangeRequestError(
error='unable to create Bitfinex order {}'.format(
exchange_order['message'])
)
order_id = exchange_order['id']
order = Order(
dt=date,
asset=asset,
amount=amount,
stop=style.get_stop_price(is_buy),
limit=style.get_limit_price(is_buy),
id=order_id
)
# TODO: is this required?
order.broker_order_id = order_id
self.portfolio.create_order(order)
return order_id
def get_open_orders(self, asset=None):
"""Retrieve all of the current open orders.
Parameters
----------
asset : Asset
If passed and not None, return only the open orders for the given
asset instead of all open orders.
Returns
-------
open_orders : dict[list[Order]] or list[Order]
If no asset is passed this will return a dict mapping Assets
to a list containing all the open orders for the asset.
If an asset is passed then this will return a list of the open
orders for this asset.
"""
try:
response = self._request('orders', None)
order_statuses = response.json()
except Exception as e:
raise ExchangeRequestError(error=e)
if 'message' in order_statuses:
raise ExchangeRequestError(
error='Unable to retrieve open orders: {}'.format(
order_statuses['message'])
)
orders = list()
for order_status in order_statuses:
order, = self._create_order(order_status)
if asset is None or asset == order.sid:
orders.append(order)
return orders
def get_order(self, order_id):
"""Lookup an order based on the order id returned from one of the
order functions.
Parameters
----------
order_id : str
The unique identifier for the order.
Returns
-------
order : Order
The order object.
"""
try:
response = self._request(
'order/status', {'order_id': int(order_id)})
order_status = response.json()
except Exception as e:
raise ExchangeRequestError(error=e)
if 'message' in order_status:
raise ExchangeRequestError(
error='Unable to retrieve order status: {}'.format(
order_status['message'])
)
return self._create_order(order_status)
def cancel_order(self, order_param):
"""Cancel an open order.
Parameters
----------
order_param : str or Order
The order_id or order object to cancel.
"""
order_id = order_param.id \
if isinstance(order_param, Order) else order_param
try:
response = self._request('order/cancel', {'order_id': order_id})
status = response.json()
except Exception as e:
raise ExchangeRequestError(error=e)
if 'message' in status:
raise ExchangeRequestError(
error='Unable to cancel order: {} {}'.format(
order_id, status['message'])
)
def tickers(self, assets):
"""
Fetch ticket data for assets
https://docs.bitfinex.com/v2/reference#rest-public-tickers
:param assets:
:return:
"""
symbols = self._get_v2_symbols(assets)
log.debug('fetching tickers {}'.format(symbols))
try:
response = requests.get(
'{url}/v2/tickers?symbols={symbols}'.format(
url=self.url,
symbols=','.join(symbols),
)
)
except Exception as e:
raise ExchangeRequestError(error=e)
if 'error' in response.content:
raise ExchangeRequestError(
error='Unable to retrieve tickers: {}'.format(
response.content)
)
tickers = response.json()
formatted_tickers = []
for index, ticker in enumerate(tickers):
if not len(ticker) == 11:
raise ExchangeRequestError(
error='Invalid ticker in response: {}'.format(ticker)
)
tick = dict(
asset=assets[index],
timestamp=pd.Timestamp.utcnow(),
bid=ticker[1],
ask=ticker[3],
last_price=ticker[7],
low=ticker[10],
high=ticker[9],
volume=ticker[8],
)
formatted_tickers.append(tick)
log.debug('got tickers {}'.format(formatted_tickers))
return formatted_tickers
+110
View File
@@ -0,0 +1,110 @@
{
"btcusd": {
"symbol": "btc_usd",
"start_date": "2010-01-01"
},
"ltcusd": {
"symbol": "ltc_usd",
"start_date": "2010-01-01"
},
"ltcbtc": {
"symbol": "ltc_btc",
"start_date": "2010-01-01"
},
"ethusd": {
"symbol": "eth_usd",
"start_date": "2010-01-01"
},
"ethbtc": {
"symbol": "eth_btc",
"start_date": "2010-01-01"
},
"etcbtc": {
"symbol": "etc_btc",
"start_date": "2010-01-01"
},
"etcusd": {
"symbol": "etc_usd",
"start_date": "2010-01-01"
},
"rrtusd": {
"symbol": "rrt_usd",
"start_date": "2010-01-01"
},
"rrtbtc": {
"symbol": "rrt_btc",
"start_date": "2010-01-01"
},
"zecusd": {
"symbol": "zec_usd",
"start_date": "2010-01-01"
},
"zecbtc": {
"symbol": "zec_btc",
"start_date": "2010-01-01"
},
"xmrusd": {
"symbol": "xmr_usd",
"start_date": "2010-01-01"
},
"xmrbtc": {
"symbol": "xmr_btc",
"start_date": "2010-01-01"
},
"dshusd": {
"symbol": "dsh_usd",
"start_date": "2010-01-01"
},
"dshbtc": {
"symbol": "dsh_btc",
"start_date": "2010-01-01"
},
"bccbtc": {
"symbol": "bcc_btc",
"start_date": "2010-01-01"
},
"bcubtc": {
"symbol": "bcu_btc",
"start_date": "2010-01-01"
},
"bccusd": {
"symbol": "bcc_usd",
"start_date": "2010-01-01"
},
"bcuusd": {
"symbol": "bcu_usd",
"start_date": "2010-01-01"
},
"xrpusd": {
"symbol": "xrp_usd",
"start_date": "2010-01-01"
},
"xrpbtc": {
"symbol": "xrp_btc",
"start_date": "2010-01-01"
},
"iotusd": {
"symbol": "iot_usd",
"start_date": "2010-01-01"
},
"iotbtc": {
"symbol": "iot_btc",
"start_date": "2010-01-01"
},
"ioteth": {
"symbol": "iot_eth",
"start_date": "2010-01-01"
},
"eosusd": {
"symbol": "eos_usd",
"start_date": "2010-01-01"
},
"eosbtc": {
"symbol": "eos_btc",
"start_date": "2010-01-01"
},
"eoseth": {
"symbol": "eos_eth",
"start_date": "2010-01-01"
}
}
+121
View File
@@ -0,0 +1,121 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from time import sleep
from logbook import Logger
from catalyst.data.data_portal import DataPortal
from catalyst.exchange.exchange_errors import (
ExchangeRequestError,
ExchangeBarDataError
)
log = Logger('DataPortalExchange')
class DataPortalExchange(DataPortal):
def __init__(self, exchange, *args, **kwargs):
self.exchange = exchange
# TODO: put somewhere accessible by each algo
self.retry_get_history_window = 5
self.retry_get_spot_value = 5
self.retry_delay = 5
super(DataPortalExchange, self).__init__(*args, **kwargs)
def _get_history_window(self,
assets,
end_dt,
bar_count,
frequency,
field,
data_frequency,
ffill=True,
attempt_index=0):
try:
return self.exchange.get_history_window(
assets,
end_dt,
bar_count,
frequency,
field,
data_frequency,
ffill)
except ExchangeRequestError as e:
log.warn(
'get history attempt {}: {}'.format(attempt_index, e)
)
if attempt_index < self.retry_get_history_window:
sleep(self.retry_delay)
return self._get_history_window(assets,
end_dt,
bar_count,
frequency,
field,
data_frequency,
ffill,
attempt_index + 1)
else:
raise ExchangeBarDataError(
data_type='history',
attempts=attempt_index,
error=e
)
def get_history_window(self,
assets,
end_dt,
bar_count,
frequency,
field,
data_frequency,
ffill=True):
return self._get_history_window(assets,
end_dt,
bar_count,
frequency,
field,
data_frequency,
ffill)
def _get_spot_value(self, assets, field, dt, data_frequency,
attempt_index=0):
try:
return self.exchange.get_spot_value(assets, field, dt,
data_frequency)
except ExchangeRequestError as e:
log.warn(
'get spot value attempt {}: {}'.format(attempt_index, e)
)
if attempt_index < self.retry_get_spot_value:
sleep(self.retry_delay)
return self._get_spot_value(assets, field, dt, data_frequency,
attempt_index + 1)
else:
raise ExchangeBarDataError(
data_type='spot',
attempts=attempt_index,
error=e
)
def get_spot_value(self, assets, field, dt, data_frequency):
return self._get_spot_value(assets, field, dt, data_frequency)
def get_adjusted_value(self, asset, field, dt,
perspective_dt,
data_frequency,
spot_value=None):
# TODO: does this pertain to cryptocurrencies?
raise NotImplementedError("get_adjusted_value is not implemented yet!")
+489
View File
@@ -0,0 +1,489 @@
import abc
import random
from time import sleep
import collections
from abc import ABCMeta, abstractmethod, abstractproperty
from datetime import timedelta
import numpy as np
import pandas as pd
from catalyst.assets._assets import Asset
from logbook import Logger
from catalyst.data.data_portal import BASE_FIELDS
from catalyst.errors import (
SymbolNotFound,
)
from catalyst.finance.order import ORDER_STATUS
from catalyst.finance.transaction import Transaction
from catalyst.exchange.exchange_utils import get_exchange_symbols
log = Logger('Exchange')
class Exchange:
__metaclass__ = ABCMeta
def __init__(self):
self.name = None
self.trading_pairs = None
self.assets = {}
self._portfolio = None
self.minute_writer = None
self.minute_reader = None
@abstractmethod
def subscribe_to_market_data(self, symbol):
pass
@abstractproperty
def positions(self):
pass
@abstractproperty
def update_portfolio(self):
pass
@abstractproperty
def portfolio(self):
pass
@abstractproperty
def account(self):
pass
@abstractproperty
def time_skew(self):
pass
def get_symbol(self, asset):
"""
Get the exchange specific symbol of the given asset.
:param asset: Asset
:return: symbol: str
"""
symbol = None
for key in self.assets:
if not symbol and self.assets[key].symbol == asset.symbol:
symbol = key
if not symbol:
raise ValueError('Currency %s not supported by exchange %s' %
(asset['symbol'], self.name))
return symbol
def get_symbols(self, assets):
"""
Get a list of symbols corresponding to each given asset.
:param assets: Asset[]
:return:
"""
symbols = []
for asset in assets:
symbols.append(self.get_symbol(asset))
return symbols
def get_asset(self, symbol):
"""
Find an Asset on the current exchange based on its Catalyst symbol
:param symbol: the [target]_[base] currency pair symbol
:return: Asset
"""
asset = None
for key in self.assets:
if not asset and self.assets[key].symbol.lower() == symbol.lower():
asset = self.assets[key]
if not asset:
raise SymbolNotFound('Asset not found: %s' % symbol)
return asset
def load_assets(self):
"""
Populate the 'assets' attribute with a dictionary of Assets.
The key of the resulting dictionary is the exchange specific
currency pair symbol. The universal symbol is contained in the
'symbol' attribute of each asset.
Notes
-----
The sid of each asset is calculated based on a numeric hash of the
universal symbol. This simple approach avoids maintaining a mapping
of sids.
This method can be overridden if an exchange offers equivalent data
via its api.
"""
symbol_map = get_exchange_symbols(self.name)
for exchange_symbol in symbol_map:
asset = symbol_map[exchange_symbol]
symbol = asset['symbol']
asset_name = ' / '.join(symbol.split('_')).upper()
asset_obj = Asset(
symbol=symbol,
asset_name=asset_name,
sid=abs(hash(symbol)) % (10 ** 4),
exchange=self.name,
start_date=pd.to_datetime(asset['start_date'], utc=True),
end_date=pd.Timestamp.utcnow() + timedelta(minutes=300000),
)
self.assets[exchange_symbol] = asset_obj
def check_open_orders(self):
"""
Loop through the list of open orders in the Portfolio object.
For each executed order found, create a transaction and apply to the
Portfolio.
:return:
transactions: Transaction[]
"""
transactions = list()
if self.portfolio.open_orders:
for order_id in list(self.portfolio.open_orders):
log.debug('found open order: {}'.format(order_id))
order, executed_price = self.get_order(order_id)
log.debug('got updated order {} {}'.format(
order, executed_price))
if order.status == ORDER_STATUS.FILLED:
transaction = Transaction(
asset=order.asset,
amount=order.amount,
dt=pd.Timestamp.utcnow(),
price=executed_price,
order_id=order.id,
commission=order.commission
)
transactions.append(transaction)
self.portfolio.execute_order(order, transaction)
elif order.status == ORDER_STATUS.CANCELLED:
self.portfolio.remove_order(order)
else:
delta = pd.Timestamp.utcnow() - order.dt
log.info(
'order {order_id} still open after {delta}'.format(
order_id=order_id,
delta=delta
)
)
return transactions
def get_spot_value(self, assets, field, dt=None, data_frequency='minute'):
"""
Public API method that returns a scalar value representing the value
of the desired asset's field at either the given dt.
Parameters
----------
assets : Asset, ContinuousFuture, or iterable of same.
The asset or assets whose data is desired.
field : {'open', 'high', 'low', 'close', 'volume',
'price', 'last_traded'}
The desired field of the asset.
dt : pd.Timestamp
The timestamp for the desired value.
data_frequency : str
The frequency of the data to query; i.e. whether the data is
'daily' or 'minute' bars
Returns
-------
value : float, int, or pd.Timestamp
The spot value of ``field`` for ``asset`` The return type is based
on the ``field`` requested. If the field is one of 'open', 'high',
'low', 'close', or 'price', the value will be a float. If the
``field`` is 'volume' the value will be a int. If the ``field`` is
'last_traded' the value will be a Timestamp.
Bitfinex timeframes
-------------------
Available values: '1m', '5m', '15m', '30m', '1h', '3h', '6h', '12h',
'1D', '7D', '14D', '1M'
"""
if field not in BASE_FIELDS:
raise KeyError('Invalid column: ' + str(field))
if isinstance(assets, collections.Iterable):
values = list()
for asset in assets:
value = self.get_single_spot_value(
asset, field, data_frequency)
values.append(value)
return values
else:
return self.get_single_spot_value(
assets, field, data_frequency)
def get_single_spot_value(self, asset, field, data_frequency):
"""
Similar to 'get_spot_value' but for a single asset
Note
----
We're writing each minute bar to disk using zipline's machinery.
This is especially useful when running multiple algorithms
concurrently. By using local data when possible, we try to reaching
request limits on exchanges.
:param asset:
:param field:
:param data_frequency:
:return value: The spot value of the given asset / field
"""
log.debug(
'fetching spot value {field} for symbol {symbol}'.format(
symbol=asset.symbol,
field=field
)
)
if field == 'price':
field = 'close'
# Don't use a timezone here
dt = pd.Timestamp.utcnow().floor('1 min')
value = None
if self.minute_reader is not None:
try:
# Slight delay to minimize the chances that multiple algos
# might try to hit the cache at the exact same time.
sleep_time = random.uniform(0.5, 0.8)
sleep(sleep_time)
# TODO: This does not always! Why is that? Open an issue with zipline.
# See: https://github.com/zipline-live/zipline/issues/26
value = self.minute_reader.get_value(
sid=asset.sid,
dt=dt,
field=field
)
except Exception as e:
log.warn('minute data not found: {}'.format(e))
if value is None or np.isnan(value):
ohlc = self.get_candles(data_frequency, asset)
if field not in ohlc:
raise KeyError('Invalid column: %s' % field)
if self.minute_writer is not None:
df = pd.DataFrame(
[ohlc],
index=pd.DatetimeIndex([dt]),
columns=['open', 'high', 'low', 'close', 'volume']
)
try:
self.minute_writer.write_sid(
sid=asset.sid,
df=df
)
log.debug('wrote minute data: {}'.format(dt))
except Exception as e:
log.warn(
'unable to write minute data: {} {}'.format(dt, e))
value = ohlc[field]
log.debug('got spot value: {}'.format(value))
else:
log.debug('got spot value from cache: {}'.format(value))
return value
def get_history_window(self,
assets,
end_dt,
bar_count,
frequency,
field,
data_frequency,
ffill=True):
"""
Public API method that returns a dataframe containing the requested
history window. Data is fully adjusted.
Parameters
----------
assets : list of catalyst.data.Asset objects
The assets whose data is desired.
end_dt: not applicable to cryptocurrencies
bar_count: int
The number of bars desired.
frequency: string
"1d" or "1m"
field: string
The desired field of the asset.
data_frequency: string
The frequency of the data to query; i.e. whether the data is
'daily' or 'minute' bars.
# TODO: fill how?
ffill: boolean
Forward-fill missing values. Only has effect if field
is 'price'.
Returns
-------
A dataframe containing the requested data.
"""
candles = self.get_candles(
data_frequency=frequency,
assets=assets,
bar_count=bar_count,
)
frames = []
for asset in assets:
asset_candles = candles[asset]
asset_data = dict()
asset_data[asset] = map(lambda candle: candle[field],
asset_candles)
dates = map(lambda candle: candle['last_traded'],
asset_candles)
df = pd.DataFrame(asset_data, index=dates)
frames.append(df)
return pd.concat(frames)
@abstractmethod
def order(self, asset, amount, limit_price, stop_price, style):
"""Place an order.
Parameters
----------
asset : Asset
The asset that this order is for.
amount : int
The amount of shares to order. If ``amount`` is positive, this is
the number of shares to buy or cover. If ``amount`` is negative,
this is the number of shares to sell or short.
limit_price : float, optional
The limit price for the order.
stop_price : float, optional
The stop price for the order.
style : ExecutionStyle, optional
The execution style for the order.
Returns
-------
order_id : str or None
The unique identifier for this order, or None if no order was
placed.
Notes
-----
The ``limit_price`` and ``stop_price`` arguments provide shorthands for
passing common execution styles. Passing ``limit_price=N`` is
equivalent to ``style=LimitOrder(N)``. Similarly, passing
``stop_price=M`` is equivalent to ``style=StopOrder(M)``, and passing
``limit_price=N`` and ``stop_price=M`` is equivalent to
``style=StopLimitOrder(N, M)``. It is an error to pass both a ``style``
and ``limit_price`` or ``stop_price``.
See Also
--------
:class:`catalyst.finance.execution.ExecutionStyle`
:func:`catalyst.api.order_value`
:func:`catalyst.api.order_percent`
"""
pass
@abstractmethod
def get_open_orders(self, asset):
"""Retrieve all of the current open orders.
Parameters
----------
asset : Asset
If passed and not None, return only the open orders for the given
asset instead of all open orders.
Returns
-------
open_orders : dict[list[Order]] or list[Order]
If no asset is passed this will return a dict mapping Assets
to a list containing all the open orders for the asset.
If an asset is passed then this will return a list of the open
orders for this asset.
"""
pass
@abstractmethod
def get_order(self, order_id):
"""Lookup an order based on the order id returned from one of the
order functions.
Parameters
----------
order_id : str
The unique identifier for the order.
Returns
-------
order : Order
The order object.
execution_price: float
The execution price per share of the order
"""
pass
@abstractmethod
def cancel_order(self, order_param):
"""Cancel an open order.
Parameters
----------
order_param : str or Order
The order_id or order object to cancel.
"""
pass
@abstractmethod
def get_candles(self, data_frequency, assets, bar_count=None):
"""
Retrieve OHLCV candles for the given assets
:param data_frequency:
:param assets:
:param end_dt:
:param bar_count:
:param limit:
:return:
"""
pass
@abc.abstractmethod
def tickers(self, assets):
"""
Retrieve current tick data for the given assets
:param assets:
:return:
"""
return
+60
View File
@@ -0,0 +1,60 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from time import sleep
import pandas as pd
from catalyst.gens.sim_engine import (
BAR,
SESSION_START,
MINUTE_END,
SESSION_END
)
from logbook import Logger
log = Logger('ExchangeClock')
class ExchangeClock(object):
"""Realtime clock for live trading.
This class is a drop-in replacement for
:class:`zipline.gens.sim_engine.MinuteSimulationClock`.
This is a stripped down version because crypto exchanges run around the clock.
The :param:`time_skew` parameter represents the time difference between
the Broker and the live trading machine's clock.
"""
def __init__(self, sessions, time_skew=pd.Timedelta("0s")):
self.sessions = sessions
self.time_skew = time_skew
self._last_emit = None
self._before_trading_start_bar_yielded = True
def __iter__(self):
yield pd.Timestamp.utcnow(), SESSION_START
while True:
current_time = pd.Timestamp.utcnow()
current_minute = current_time.floor('1 min')
if self._last_emit is None or current_minute > self._last_emit:
log.debug('emitting minutely bar: {}'.format(current_minute))
self._last_emit = current_minute
yield current_minute, BAR
else:
sleep(1)
+60
View File
@@ -0,0 +1,60 @@
from catalyst.errors import ZiplineError
class ExchangeRequestError(ZiplineError):
msg = (
'Request failed: {error}'
).strip()
class ExchangeRequestErrorTooManyAttempts(ZiplineError):
msg = (
'Request failed: {error}, giving up after {attempts} attempts'
).strip()
class ExchangeBarDataError(ZiplineError):
msg = (
'Unable to retrieve bar data: {data_type}, ' +
'giving up after {attempts} attempts: {error}'
).strip()
class ExchangePortfolioDataError(ZiplineError):
msg = (
'Unable to retrieve portfolio data: {data_type}, ' +
'giving up after {attempts} attempts: {error}'
).strip()
class ExchangeTransactionError(ZiplineError):
msg = (
'Unable to execute transaction: {transaction_type}, ' +
'giving up after {attempts} attempts: {error}'
).strip()
class ExchangeAuthNotFound(ZiplineError):
msg = (
'Please create an auth.json file containing the api token and key for '
'exchange {exchange}. Place the file here: {filename}'
).strip()
class ExchangeSymbolsNotFound(ZiplineError):
msg = (
'Unable to download or find a local copy of symbols.json for exchange '
'{exchange}. The file should be here: {filename}'
).strip()
class AlgoPickleNotFound(ZiplineError):
msg = (
'Pickle not found for algo {algo} in path {filename}'
).strip()
class InvalidHistoryFrequencyError(ZiplineError):
msg = (
'History frequency {frequency} not supported by the exchange.'
).strip()
+87
View File
@@ -0,0 +1,87 @@
import numpy as np
from logbook import Logger
from catalyst.protocol import Portfolio, Positions, Position
log = Logger('ExchangePortfolio')
class ExchangePortfolio(Portfolio):
"""
Since the goal is to support multiple exchanges, it makes sense to
include additional stats in the portfolio object.
Instead of relying on the performance tracker, each exchange portfolio
tracks its own holding. This offers a separation between tracking an
exchange and the statistics of the algorithm.
"""
def __init__(self, start_date, starting_cash=None):
self.capital_used = 0.0
self.starting_cash = starting_cash
self.portfolio_value = starting_cash
self.pnl = 0.0
self.returns = 0.0
self.cash = starting_cash
self.positions = Positions()
self.start_date = start_date
self.positions_value = 0.0
self.open_orders = dict()
def calculate_pnl(self):
log.debug('calculating pnl')
def create_order(self, order):
log.debug('creating order {}'.format(order.id))
self.open_orders[order.id] = order
order_position = self.positions[order.asset] \
if order.asset in self.positions else None
if order_position is None:
order_position = Position(order.asset)
self.positions[order.asset] = order_position
order_position.amount += order.amount
log.debug('open order added to portfolio')
def execute_order(self, order, transaction):
log.debug('executing order {}'.format(order.id))
del self.open_orders[order.id]
order_position = self.positions[order.asset] \
if order.asset in self.positions else None
if order_position is None:
raise ValueError(
'Trying to execute order for a position not held: %s' % order.id
)
self.capital_used += order.amount * transaction.price
if order.amount > 0:
if order_position.cost_basis > 0:
order_position.cost_basis = np.average(
[order_position.cost_basis, transaction.price],
weights=[order_position.amount, order.amount]
)
else:
order_position.cost_basis = transaction.price
log.debug('updated portfolio with executed order')
def remove_order(self, order):
log.info('removing cancelled order {}'.format(order.id))
del self.open_orders[order.id]
order_position = self.positions[order.asset] \
if order.asset in self.positions else None
if order_position is None:
raise ValueError(
'Trying to remove order for a position not held: %s' % order.id
)
order_position.amount -= order.amount
log.debug('removed order from portfolio')
+133
View File
@@ -0,0 +1,133 @@
import json
import os
import pickle
import urllib
from datetime import date, datetime
from catalyst.exchange.exchange_errors import ExchangeAuthNotFound, \
ExchangeSymbolsNotFound
from catalyst.utils.paths import data_root, ensure_directory
SYMBOLS_URL = 'https://raw.githubusercontent.com/enigmampc/catalyst/' \
'live-trading/catalyst/exchange/symbols/{exchange}.json'
def get_exchange_folder(exchange_name, environ=None):
if not environ:
environ = os.environ
root = data_root(environ)
exchange_folder = os.path.join(root, 'exchanges', exchange_name)
ensure_directory(exchange_folder)
return exchange_folder
def download_exchange_symbols(exchange_name, environ=None):
exchange_folder = get_exchange_folder(exchange_name, environ)
filename = os.path.join(exchange_folder, 'symbols.json')
url = SYMBOLS_URL.format(exchange=exchange_name)
response = urllib.urlretrieve(url=url, filename=filename)
return response
def get_exchange_symbols(exchange_name, environ=None):
exchange_folder = get_exchange_folder(exchange_name, environ)
filename = os.path.join(exchange_folder, 'symbols.json')
if not os.path.isfile(filename):
download_exchange_symbols(exchange_name, environ)
if os.path.isfile(filename):
with open(filename) as data_file:
data = json.load(data_file)
return data
else:
raise ExchangeSymbolsNotFound(
exchange=exchange_name,
filename=filename
)
def get_exchange_auth(exchange_name, environ=None):
exchange_folder = get_exchange_folder(exchange_name, environ)
filename = os.path.join(exchange_folder, 'auth.json')
if os.path.isfile(filename):
with open(filename) as data_file:
data = json.load(data_file)
return data
else:
raise ExchangeAuthNotFound(
exchange=exchange_name,
filename=filename
)
def get_algo_folder(algo_name, environ=None):
if not environ:
environ = os.environ
root = data_root(environ)
algo_folder = os.path.join(root, 'live_algos', algo_name)
ensure_directory(algo_folder)
return algo_folder
def get_algo_object(algo_name, key, environ=None, rel_path=None):
folder = get_algo_folder(algo_name, environ)
if rel_path is not None:
folder = os.path.join(folder, rel_path)
filename = os.path.join(folder, key + '.p')
if os.path.isfile(filename):
try:
with open(filename, 'rb') as handle:
return pickle.load(handle)
except Exception as e:
return None
else:
return None
def save_algo_object(algo_name, key, obj, environ=None, rel_path=None):
folder = get_algo_folder(algo_name, environ)
if rel_path is not None:
folder = os.path.join(folder, rel_path)
ensure_directory(folder)
filename = os.path.join(folder, key + '.p')
with open(filename, 'wb') as handle:
pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL)
def append_algo_object(algo_name, key, obj, environ=None):
algo_folder = get_algo_folder(algo_name, environ)
filename = os.path.join(algo_folder, key + '.p')
mode = 'a+b' if os.path.isfile(filename) else 'wb'
with open(filename, mode) as handle:
pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL)
def get_exchange_minute_writer_root(exchange_name, environ=None):
exchange_folder = get_exchange_folder(exchange_name, environ)
minute_data_folder = os.path.join(exchange_folder, 'minute_data')
ensure_directory(minute_data_folder)
return minute_data_folder
def perf_serial(obj):
"""JSON serializer for objects not serializable by default json code"""
if isinstance(obj, (datetime, date)):
return obj.isoformat()
raise TypeError("Type %s not serializable" % type(obj))