mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-30 15:15:51 +08:00
BLD: implementing issue #65, implemented custom exchange data
This commit is contained in:
@@ -38,6 +38,7 @@ from numpy cimport int64_t
|
||||
import warnings
|
||||
cimport numpy as np
|
||||
|
||||
from catalyst.exchange.exchange_utils import get_sid
|
||||
from catalyst.utils.calendars import get_calendar
|
||||
from catalyst.exchange.exchange_errors import InvalidSymbolError, SidHashError
|
||||
|
||||
@@ -503,11 +504,7 @@ cdef class TradingPair(Asset):
|
||||
|
||||
if sid == 0 or sid is None:
|
||||
try:
|
||||
# sid = abs(hash(symbol)) % (10 ** 4)
|
||||
# TODO: try to encode the symbol in the main scope
|
||||
sid = int(
|
||||
hashlib.sha256(symbol.encode('utf-8')).hexdigest(), 16
|
||||
) % 10 ** 6
|
||||
sid = get_sid(symbol)
|
||||
except Exception as e:
|
||||
raise SidHashError(symbol=symbol)
|
||||
|
||||
@@ -559,6 +556,17 @@ cdef class TradingPair(Asset):
|
||||
end_minute=self.end_minute
|
||||
)
|
||||
|
||||
cpdef to_dict(self):
|
||||
"""
|
||||
Convert to a python dict.
|
||||
"""
|
||||
super_dict = super(TradingPair, self).to_dict()
|
||||
super_dict['end_daily'] = self.end_daily
|
||||
super_dict['end_minute'] = self.end_minute
|
||||
super_dict['leverage'] = self.leverage
|
||||
super_dict['min_trade_size'] = self.min_trade_size
|
||||
return super_dict
|
||||
|
||||
def is_exchange_open(self, dt_minute):
|
||||
"""
|
||||
Parameters
|
||||
|
||||
@@ -4,7 +4,10 @@ import logbook
|
||||
|
||||
LOG_LEVEL = logbook.INFO
|
||||
|
||||
SYMBOLS_URL = 'https://s3.amazonaws.com/enigmaco/catalyst-exchanges/' \
|
||||
'{exchange}/symbols.json'
|
||||
|
||||
DATE_TIME_FORMAT = '%Y-%m-%d %H:%M'
|
||||
DATE_FORMAT = '%Y-%m-%d'
|
||||
|
||||
AUTO_INGEST = False
|
||||
AUTO_INGEST = False
|
||||
|
||||
@@ -1,283 +0,0 @@
|
||||
# For this example, we're going to write a simple momentum script. When the
|
||||
# stock goes up quickly, we're going to buy; when it goes down quickly, we're
|
||||
# going to sell. Hopefully we'll ride the waves.
|
||||
from datetime import timedelta
|
||||
|
||||
import pandas as pd
|
||||
import talib
|
||||
# To run an algorithm in Catalyst, you need two functions: initialize and
|
||||
# handle_data.
|
||||
from logbook import Logger
|
||||
from talib.common import MA_Type
|
||||
|
||||
from catalyst import run_algorithm
|
||||
from catalyst.api import symbol, record, order_target_percent, \
|
||||
get_open_orders
|
||||
# We give a name to the algorithm which Catalyst will use to persist its state.
|
||||
# In this example, Catalyst will create the `.catalyst/data/live_algos`
|
||||
# directory. If we stop and start the algorithm, Catalyst will resume its
|
||||
# state using the files included in the folder.
|
||||
from catalyst.exchange.stats_utils import extract_transactions, trend_direction
|
||||
|
||||
algo_namespace = 'mean_reversion'
|
||||
log = Logger(algo_namespace)
|
||||
|
||||
|
||||
def initialize(context):
|
||||
# This initialize function sets any data or variables that you'll use in
|
||||
# your algorithm. For instance, you'll want to define the trading pair (or
|
||||
# trading pairs) you want to backtest. You'll also want to define any
|
||||
# parameters or values you're going to use.
|
||||
|
||||
# In our example, we're looking at Ether in USD Tether.
|
||||
context.eth_btc = symbol('neo_usd')
|
||||
context.base_price = None
|
||||
context.current_day = None
|
||||
context.trigger = None
|
||||
|
||||
|
||||
def handle_data(context, data):
|
||||
# This handle_data function is where the real work is done. Our data is
|
||||
# minute-level tick data, and each minute is called a frame. This function
|
||||
# runs on each frame of the data.
|
||||
|
||||
# We flag the first period of each day.
|
||||
# Since cryptocurrencies trade 24/7 the `before_trading_starts` handle
|
||||
# would only execute once. This method works with minute and daily
|
||||
# frequencies.
|
||||
today = data.current_dt.floor('1D')
|
||||
if today != context.current_day:
|
||||
context.traded_today = False
|
||||
context.current_day = today
|
||||
|
||||
# We're computing the volume-weighted-average-price of the security
|
||||
# defined above, in the context.eth_btc variable. For this example, we're
|
||||
# using three bars on the 15 min bars.
|
||||
|
||||
# The frequency attribute determine the bar size. We use this convention
|
||||
# for the frequency alias:
|
||||
# http://pandas.pydata.org/pandas-docs/stable/timeseries.html#offset-aliases
|
||||
prices = data.history(
|
||||
context.eth_btc,
|
||||
fields='close',
|
||||
bar_count=50,
|
||||
frequency='15T'
|
||||
)
|
||||
|
||||
# Ta-lib calculates various technical indicator based on price and
|
||||
# volume arrays.
|
||||
|
||||
# In this example, we are comp
|
||||
rsi = talib.RSI(prices.values, timeperiod=14)
|
||||
upper, middle, lower = talib.BBANDS(
|
||||
prices.values,
|
||||
timeperiod=20,
|
||||
nbdevup=2,
|
||||
nbdevdn=2,
|
||||
matype=MA_Type.EMA
|
||||
)
|
||||
|
||||
# We need a variable for the current price of the security to compare to
|
||||
# the average. Since we are requesting two fields, data.current()
|
||||
# returns a DataFrame with
|
||||
current = data.current(context.eth_btc, fields=['close', 'volume'])
|
||||
price = current['close']
|
||||
|
||||
# If base_price is not set, we use the current value. This is the
|
||||
# price at the first bar which we reference to calculate price_change.
|
||||
if context.base_price is None:
|
||||
context.base_price = price
|
||||
|
||||
price_change = (price - context.base_price) / context.base_price
|
||||
cash = context.portfolio.cash
|
||||
|
||||
# Now that we've collected all current data for this frame, we use
|
||||
# the record() method to save it. This data will be available as
|
||||
# a parameter of the analyze() function for further analysis.
|
||||
record(
|
||||
price=price,
|
||||
volume=current['volume'],
|
||||
upper_band=upper[-1],
|
||||
lower_band=lower[-1],
|
||||
price_change=price_change,
|
||||
rsi=rsi[-1],
|
||||
cash=cash
|
||||
)
|
||||
|
||||
# We are trying to avoid over-trading by limiting our trades to
|
||||
# one per day.
|
||||
if context.traded_today:
|
||||
return
|
||||
|
||||
# Since we are using limit orders, some orders may not execute immediately
|
||||
# we wait until all orders are executed before considering more trades.
|
||||
orders = get_open_orders(context.eth_btc)
|
||||
if len(orders) > 0:
|
||||
return
|
||||
|
||||
# Exit if we cannot trade
|
||||
if not data.can_trade(context.eth_btc):
|
||||
return
|
||||
|
||||
# Another powerful built-in feature of the Catalyst backtester is the
|
||||
# portfolio object. The portfolio object tracks your positions, cash,
|
||||
# cost basis of specific holdings, and more. In this line, we calculate
|
||||
# how long or short our position is at this minute.
|
||||
pos_amount = context.portfolio.positions[context.eth_btc].amount
|
||||
|
||||
# In this example, we're using a trigger instead of buying directly after
|
||||
# a signal. Since this is mean reversion, our signals go against the
|
||||
# momentum. Using a trigger allow us to spot the opportunity but trade
|
||||
# only when a trade reversal begins.
|
||||
if context.trigger is not None:
|
||||
# The tread_direction() method determines the trend based on the last
|
||||
# two bars of the series.
|
||||
direction = trend_direction(rsi)
|
||||
if context.trigger[1] == 'buy' and direction == 'up':
|
||||
log.info(
|
||||
'{}: buying - price: {}, rsi: {}, bband: {}'.format(
|
||||
data.current_dt, price, rsi[-1], lower[-1]
|
||||
)
|
||||
)
|
||||
order_target_percent(context.eth_btc, 1)
|
||||
context.traded_today = True
|
||||
context.trigger = None
|
||||
|
||||
elif context.trigger[1] == 'sell' and direction == 'down':
|
||||
log.info(
|
||||
'{}: selling - price: {}, rsi: {}, bband: {}'.format(
|
||||
data.current_dt, price, rsi[-1], upper[-1]
|
||||
)
|
||||
)
|
||||
order_target_percent(context.eth_btc, 0)
|
||||
context.traded_today = True
|
||||
context.trigger = None
|
||||
|
||||
# If we found a signal but no trade reversal within two hours, we
|
||||
# reset the trigger.
|
||||
elif context.trigger[0] + timedelta(hours=2) < data.current_dt:
|
||||
context.trigger = None
|
||||
|
||||
else:
|
||||
# Determining the entry and exit signals based on RSI and SMA
|
||||
if rsi[-1] <= 30 and pos_amount == 0:
|
||||
context.trigger = (data.current_dt, 'buy')
|
||||
|
||||
elif rsi[-1] >= 80 and pos_amount > 0:
|
||||
context.trigger = (data.current_dt, 'sell')
|
||||
|
||||
|
||||
def analyze(context=None, perf=None):
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
# The base currency of the algo exchange
|
||||
base_currency = context.exchanges.values()[0].base_currency.upper()
|
||||
|
||||
# Plot the portfolio value over time.
|
||||
ax1 = plt.subplot(611)
|
||||
perf.loc[:, 'portfolio_value'].plot(ax=ax1)
|
||||
ax1.set_ylabel('Portfolio Value ({})'.format(base_currency))
|
||||
|
||||
# Plot the price increase or decrease over time.
|
||||
ax2 = plt.subplot(612, sharex=ax1)
|
||||
perf.loc[:, 'price'].plot(ax=ax2, label='Price')
|
||||
perf.loc[:, 'upper_band'].plot(ax=ax2, label='Upper')
|
||||
perf.loc[:, 'lower_band'].plot(ax=ax2, label='Lower')
|
||||
|
||||
ax2.set_ylabel('{asset} ({base})'.format(
|
||||
asset=context.eth_btc.symbol, base=base_currency
|
||||
))
|
||||
|
||||
transaction_df = extract_transactions(perf)
|
||||
if not transaction_df.empty:
|
||||
buy_df = transaction_df[transaction_df['amount'] > 0]
|
||||
sell_df = transaction_df[transaction_df['amount'] < 0]
|
||||
ax2.scatter(
|
||||
buy_df.index.to_pydatetime(),
|
||||
perf.loc[buy_df.index, 'price'],
|
||||
marker='^',
|
||||
s=100,
|
||||
c='green',
|
||||
label=''
|
||||
)
|
||||
ax2.scatter(
|
||||
sell_df.index.to_pydatetime(),
|
||||
perf.loc[sell_df.index, 'price'],
|
||||
marker='v',
|
||||
s=100,
|
||||
c='red',
|
||||
label=''
|
||||
)
|
||||
|
||||
ax4 = plt.subplot(613, sharex=ax1)
|
||||
perf.loc[:, 'cash'].plot(
|
||||
ax=ax4, label='Base Currency ({})'.format(base_currency)
|
||||
)
|
||||
ax4.set_ylabel('Cash ({})'.format(base_currency))
|
||||
|
||||
perf['algorithm'] = perf.loc[:, 'algorithm_period_return']
|
||||
|
||||
ax5 = plt.subplot(614, sharex=ax1)
|
||||
perf.loc[:, ['algorithm', 'price_change']].plot(ax=ax5)
|
||||
ax5.set_ylabel('Percent Change')
|
||||
|
||||
ax6 = plt.subplot(615, sharex=ax1)
|
||||
perf.loc[:, 'rsi'].plot(ax=ax6, label='RSI')
|
||||
ax6.axhline(70, color='darkgoldenrod')
|
||||
ax6.axhline(30, color='darkgoldenrod')
|
||||
|
||||
if not transaction_df.empty:
|
||||
ax6.scatter(
|
||||
buy_df.index.to_pydatetime(),
|
||||
perf.loc[buy_df.index, 'rsi'],
|
||||
marker='^',
|
||||
s=100,
|
||||
c='green',
|
||||
label=''
|
||||
)
|
||||
ax6.scatter(
|
||||
sell_df.index.to_pydatetime(),
|
||||
perf.loc[sell_df.index, 'rsi'],
|
||||
marker='v',
|
||||
s=100,
|
||||
c='red',
|
||||
label=''
|
||||
)
|
||||
plt.legend(loc=3)
|
||||
|
||||
# Show the plot.
|
||||
plt.gcf().set_size_inches(18, 8)
|
||||
plt.show()
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# The execution mode: backtest or live
|
||||
MODE = 'backtest'
|
||||
|
||||
if MODE == 'backtest':
|
||||
# catalyst run -f catalyst/examples/mean_reversion_simple.py -x poloniex -s 2017-7-1 -e 2017-7-31 -c usdt -n mean-reversion --data-frequency minute --capital-base 10000
|
||||
run_algorithm(
|
||||
capital_base=1,
|
||||
data_frequency='minute',
|
||||
initialize=initialize,
|
||||
handle_data=handle_data,
|
||||
analyze=analyze,
|
||||
exchange_name='bitfinex',
|
||||
algo_namespace=algo_namespace,
|
||||
base_currency='usd',
|
||||
start=pd.to_datetime('2017-10-1', utc=True),
|
||||
end=pd.to_datetime('2017-11-13', utc=True),
|
||||
)
|
||||
|
||||
elif MODE == 'live':
|
||||
run_algorithm(
|
||||
initialize=initialize,
|
||||
handle_data=handle_data,
|
||||
analyze=analyze,
|
||||
exchange_name='bitfinex',
|
||||
live=True,
|
||||
algo_namespace=algo_namespace,
|
||||
base_currency='usd',
|
||||
live_graph=True
|
||||
)
|
||||
@@ -48,7 +48,8 @@ class AssetFinderExchange(object):
|
||||
# log.debug('fetching asset: {}'.format(sid))
|
||||
return list()
|
||||
|
||||
def lookup_symbol(self, symbol, exchange, as_of_date=None, fuzzy=False):
|
||||
def lookup_symbol(self, symbol, exchange, data_frequency=None,
|
||||
as_of_date=None, fuzzy=False):
|
||||
"""Lookup an asset by symbol.
|
||||
|
||||
Parameters
|
||||
@@ -84,10 +85,15 @@ class AssetFinderExchange(object):
|
||||
"""
|
||||
log.debug('looking up symbol: {} {}'.format(symbol, exchange.name))
|
||||
|
||||
key = ','.join([exchange.name, symbol])
|
||||
if data_frequency is not None:
|
||||
key = ','.join([exchange.name, symbol, data_frequency])
|
||||
|
||||
else:
|
||||
key = ','.join([exchange.name, symbol])
|
||||
|
||||
if key in self._asset_cache:
|
||||
return self._asset_cache[key]
|
||||
else:
|
||||
asset = exchange.get_asset(symbol)
|
||||
asset = exchange.get_asset(symbol, data_frequency)
|
||||
self._asset_cache[key] = asset
|
||||
return asset
|
||||
|
||||
@@ -46,8 +46,13 @@ class Bitfinex(Exchange):
|
||||
self.secret = secret.encode('UTF-8')
|
||||
self.name = 'bitfinex'
|
||||
self.color = 'green'
|
||||
self.assets = {}
|
||||
|
||||
self.assets = dict()
|
||||
self.load_assets()
|
||||
|
||||
self.local_assets = dict()
|
||||
self.load_assets(is_local=True)
|
||||
|
||||
self.base_currency = base_currency
|
||||
self._portfolio = portfolio
|
||||
self.minute_writer = None
|
||||
|
||||
@@ -46,6 +46,9 @@ class Bittrex(Exchange):
|
||||
self.assets = dict()
|
||||
self.load_assets()
|
||||
|
||||
self.local_assets = dict()
|
||||
self.load_assets(is_local=True)
|
||||
|
||||
self.bundle = ExchangeBundle(self.name)
|
||||
|
||||
@property
|
||||
|
||||
@@ -9,11 +9,13 @@ import pytz
|
||||
from catalyst.assets._assets import TradingPair
|
||||
|
||||
from catalyst.data.bundles.core import download_without_progress
|
||||
from catalyst.exchange.exchange_utils import get_exchange_bundles_folder
|
||||
from catalyst.exchange.exchange_utils import get_exchange_bundles_folder, \
|
||||
get_exchange_symbols
|
||||
|
||||
EXCHANGE_NAMES = ['bitfinex', 'bittrex', 'poloniex']
|
||||
API_URL = 'http://data.enigma.co/api/v1'
|
||||
|
||||
|
||||
def get_date_from_ms(ms):
|
||||
"""
|
||||
The date from the number of miliseconds from the epoch.
|
||||
|
||||
@@ -16,7 +16,7 @@ from catalyst.exchange.exchange_bundle import ExchangeBundle
|
||||
from catalyst.exchange.exchange_errors import MismatchingBaseCurrencies, \
|
||||
InvalidOrderStyle, BaseCurrencyNotFoundError, SymbolNotFoundOnExchange, \
|
||||
PricingDataNotLoadedError, \
|
||||
NoDataAvailableOnExchange
|
||||
NoDataAvailableOnExchange, ExchangeSymbolsNotFound
|
||||
from catalyst.exchange.exchange_execution import ExchangeStopLimitOrder, \
|
||||
ExchangeLimitOrder, ExchangeStopOrder
|
||||
from catalyst.exchange.exchange_portfolio import ExchangePortfolio
|
||||
@@ -33,7 +33,8 @@ class Exchange:
|
||||
|
||||
def __init__(self):
|
||||
self.name = None
|
||||
self.assets = {}
|
||||
self.assets = dict()
|
||||
self.local_assets = dict()
|
||||
self._portfolio = None
|
||||
self.minute_writer = None
|
||||
self.minute_reader = None
|
||||
@@ -173,7 +174,7 @@ class Exchange:
|
||||
|
||||
return symbols
|
||||
|
||||
def get_assets(self, symbols=None):
|
||||
def get_assets(self, symbols=None, data_frequency=None):
|
||||
"""
|
||||
The list of markets for the specified symbols.
|
||||
|
||||
@@ -190,7 +191,7 @@ class Exchange:
|
||||
|
||||
if symbols is not None:
|
||||
for symbol in symbols:
|
||||
asset = self.get_asset(symbol)
|
||||
asset = self.get_asset(symbol, data_frequency)
|
||||
assets.append(asset)
|
||||
else:
|
||||
for key in self.assets:
|
||||
@@ -198,7 +199,19 @@ class Exchange:
|
||||
|
||||
return assets
|
||||
|
||||
def get_asset(self, symbol):
|
||||
def _find_asset(self, asset, symbol, data_frequency, is_local=False):
|
||||
assets = self.assets if not is_local else self.local_assets
|
||||
|
||||
for key in assets:
|
||||
if not asset and assets[key].symbol.lower() == symbol.lower() and (
|
||||
not data_frequency or (
|
||||
data_frequency == 'minute' and assets[
|
||||
key].end_minute is not None)):
|
||||
asset = assets[key]
|
||||
|
||||
return asset
|
||||
|
||||
def get_asset(self, symbol, data_frequency=None):
|
||||
"""
|
||||
The market for the specified symbol.
|
||||
|
||||
@@ -213,13 +226,17 @@ class Exchange:
|
||||
"""
|
||||
asset = None
|
||||
|
||||
for key in self.assets:
|
||||
if not asset and self.assets[key].symbol.lower() == symbol.lower():
|
||||
asset = self.assets[key]
|
||||
log.debug('searching asset {} on the server')
|
||||
asset = self._find_asset(asset, symbol, data_frequency, False)
|
||||
|
||||
log.debug('asset {} not found on the server, searching local assets')
|
||||
asset = self._find_asset(asset, symbol, data_frequency, True)
|
||||
|
||||
if not asset:
|
||||
all_values = list(self.assets.values()) + \
|
||||
list(self.local_assets.values())
|
||||
supported_symbols = [
|
||||
pair.symbol for pair in list(self.assets.values())
|
||||
asset.symbol for asset in all_values
|
||||
]
|
||||
|
||||
raise SymbolNotFoundOnExchange(
|
||||
@@ -230,10 +247,10 @@ class Exchange:
|
||||
|
||||
return asset
|
||||
|
||||
def fetch_symbol_map(self):
|
||||
return get_exchange_symbols(self.name)
|
||||
def fetch_symbol_map(self, is_local=False):
|
||||
return get_exchange_symbols(self.name, is_local)
|
||||
|
||||
def load_assets(self):
|
||||
def load_assets(self, is_local=False):
|
||||
"""
|
||||
Populate the 'assets' attribute with a dictionary of Assets.
|
||||
The key of the resulting dictionary is the exchange specific
|
||||
@@ -246,11 +263,15 @@ class Exchange:
|
||||
universal symbol. This simple approach avoids maintaining a mapping
|
||||
of sids.
|
||||
|
||||
This method can be overridden if an exchange offers equivalent data
|
||||
This method can be omerridden if an exchange offers equivalent data
|
||||
via its api.
|
||||
|
||||
"""
|
||||
symbol_map = self.fetch_symbol_map()
|
||||
try:
|
||||
symbol_map = self.fetch_symbol_map(is_local)
|
||||
except ExchangeSymbolsNotFound:
|
||||
return None
|
||||
|
||||
for exchange_symbol in symbol_map:
|
||||
asset = symbol_map[exchange_symbol]
|
||||
|
||||
@@ -302,7 +323,10 @@ class Exchange:
|
||||
exchange_symbol=exchange_symbol
|
||||
)
|
||||
|
||||
self.assets[exchange_symbol] = trading_pair
|
||||
if is_local:
|
||||
self.local_assets[exchange_symbol] = trading_pair
|
||||
else:
|
||||
self.assets[exchange_symbol] = trading_pair
|
||||
|
||||
def check_open_orders(self):
|
||||
"""
|
||||
|
||||
@@ -117,6 +117,7 @@ class ExchangeTradingAlgorithmBase(TradingAlgorithm):
|
||||
return self.asset_finder.lookup_symbol(
|
||||
symbol=symbol_str,
|
||||
exchange=exchange,
|
||||
data_frequency=self.data_frequency,
|
||||
as_of_date=_lookup_date
|
||||
)
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import json
|
||||
import os
|
||||
import os
|
||||
import shutil
|
||||
from datetime import datetime, timedelta
|
||||
@@ -8,6 +8,7 @@ from operator import is_not
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import pytz
|
||||
from catalyst.assets._assets import TradingPair
|
||||
from logbook import Logger
|
||||
from pytz import UTC
|
||||
@@ -29,7 +30,7 @@ from catalyst.exchange.exchange_errors import EmptyValuesInBundleError, \
|
||||
NoDataAvailableOnExchange, \
|
||||
PricingDataNotLoadedError, DataCorruptionError, ExchangeSymbolsNotFound
|
||||
from catalyst.exchange.exchange_utils import get_exchange_folder, \
|
||||
get_exchange_symbols, perf_serial, symbols_serial
|
||||
get_exchange_symbols, save_exchange_symbols
|
||||
from catalyst.utils.cli import maybe_show_progress
|
||||
from catalyst.utils.paths import ensure_directory
|
||||
|
||||
@@ -647,7 +648,8 @@ class ExchangeBundle:
|
||||
'\n'.join(problems)
|
||||
))
|
||||
|
||||
def ingest_csv(self, path, data_frequency):
|
||||
def ingest_csv(self, path, data_frequency, empty_rows_behavior='strip',
|
||||
duplicates_threshold=100):
|
||||
"""
|
||||
Ingest price data from a CSV file.
|
||||
|
||||
@@ -686,12 +688,19 @@ class ExchangeBundle:
|
||||
parse_dates=['last_traded'],
|
||||
index_col=None
|
||||
)
|
||||
min_start_dt = None
|
||||
max_end_dt = None
|
||||
|
||||
symbols = df['symbol'].unique()
|
||||
trading_pairs = dict()
|
||||
|
||||
# Apply the timezone before creating an index for simplicity
|
||||
df['last_traded'] = df['last_traded'].dt.tz_localize(pytz.UTC)
|
||||
df.set_index(['symbol', 'last_traded'], drop=True, inplace=True)
|
||||
|
||||
assets = dict()
|
||||
for symbol in symbols:
|
||||
start_dt = df['last_traded'].min()
|
||||
end_dt = df['last_traded'].max()
|
||||
start_dt = df.index.get_level_values(1).min()
|
||||
end_dt = df.index.get_level_values(1).max()
|
||||
end_dt_key = 'end_{}'.format(data_frequency)
|
||||
|
||||
if symbol is symbols_def:
|
||||
@@ -710,10 +719,16 @@ class ExchangeBundle:
|
||||
if data_frequency == 'minute' else symbol_def['end_minute']
|
||||
|
||||
else:
|
||||
end_daily = end_dt if data_frequency == 'daily' else None
|
||||
end_minute = end_dt if data_frequency == 'minute' else None
|
||||
end_daily = end_dt if data_frequency == 'daily' else 'N/A'
|
||||
end_minute = end_dt if data_frequency == 'minute' else 'N/A'
|
||||
|
||||
trading_pair = TradingPair(
|
||||
if min_start_dt is None or start_dt < min_start_dt:
|
||||
min_start_dt = start_dt
|
||||
|
||||
if max_end_dt is None or end_dt > max_end_dt:
|
||||
max_end_dt = end_dt
|
||||
|
||||
asset = TradingPair(
|
||||
symbol=symbol,
|
||||
exchange=self.exchange_name,
|
||||
start_date=start_dt,
|
||||
@@ -725,19 +740,42 @@ class ExchangeBundle:
|
||||
end_minute=end_minute,
|
||||
exchange_symbol=symbol
|
||||
)
|
||||
trading_pairs[symbol] = trading_pair.to_dict()
|
||||
assets[symbol] = asset
|
||||
|
||||
symbols_def_json = json.dumps(trading_pairs, default=symbols_serial)
|
||||
df.set_index(['symbol', 'last_traded'], drop=True, inplace=True)
|
||||
df.tz_localize('UTC', level=1)
|
||||
# problems += self.ingest_df(
|
||||
# ohlcv_df=df,
|
||||
# data_frequency=data_frequency,
|
||||
# asset=asset,
|
||||
# writer=writer,
|
||||
# empty_rows_behavior=empty_rows_behavior,
|
||||
# duplicates_threshold=duplicates_threshold
|
||||
# )
|
||||
save_exchange_symbols(self.exchange_name, assets, True)
|
||||
|
||||
writer = self.get_writer(
|
||||
start_dt=min_start_dt.replace(hour=00, minute=00),
|
||||
end_dt=max_end_dt.replace(hour=23, minute=59),
|
||||
data_frequency=data_frequency
|
||||
)
|
||||
|
||||
for symbol in assets:
|
||||
asset = assets[symbol]
|
||||
ohlcv_df = df.loc[
|
||||
(df.index.get_level_values(0) == symbol)
|
||||
] # type: pd.DataFrame
|
||||
ohlcv_df.index = ohlcv_df.index.droplevel(0)
|
||||
|
||||
period_start = start_dt.replace(hour=00, minute=00)
|
||||
period_end = end_dt.replace(hour=23, minute=59)
|
||||
periods = self.get_calendar_periods_range(
|
||||
period_start, period_end, data_frequency
|
||||
)
|
||||
|
||||
# We're not really resampling but ensuring that each frame
|
||||
# contains data
|
||||
ohlcv_df = ohlcv_df.reindex(periods, method='ffill')
|
||||
ohlcv_df['volume'] = ohlcv_df['volume'].fillna(0)
|
||||
|
||||
problems += self.ingest_df(
|
||||
ohlcv_df=ohlcv_df,
|
||||
data_frequency=data_frequency,
|
||||
asset=asset,
|
||||
writer=writer,
|
||||
empty_rows_behavior=empty_rows_behavior,
|
||||
duplicates_threshold=duplicates_threshold
|
||||
)
|
||||
return filter(partial(is_not, None), problems)
|
||||
|
||||
def ingest(self, data_frequency, include_symbols=None,
|
||||
@@ -1018,6 +1056,10 @@ class ExchangeBundle:
|
||||
if os.path.isfile(symbols):
|
||||
os.remove(symbols)
|
||||
|
||||
local_symbols = os.path.join(root, 'symbols_local.json')
|
||||
if os.path.isfile(local_symbols):
|
||||
os.remove(local_symbols)
|
||||
|
||||
temp_bundles = os.path.join(root, 'temp_bundles')
|
||||
|
||||
if os.path.isdir(temp_bundles):
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import pickle
|
||||
@@ -9,14 +10,31 @@ import pandas as pd
|
||||
from catalyst.assets._assets import TradingPair
|
||||
from six.moves.urllib import request
|
||||
|
||||
from catalyst.constants import DATE_TIME_FORMAT, DATE_FORMAT
|
||||
from catalyst.constants import DATE_FORMAT, SYMBOLS_URL
|
||||
from catalyst.exchange.exchange_errors import ExchangeSymbolsNotFound, \
|
||||
InvalidHistoryFrequencyError, InvalidHistoryFrequencyAlias
|
||||
from catalyst.utils.paths import data_root, ensure_directory, \
|
||||
last_modified_time
|
||||
|
||||
SYMBOLS_URL = 'https://s3.amazonaws.com/enigmaco/catalyst-exchanges/' \
|
||||
'{exchange}/symbols.json'
|
||||
|
||||
def get_sid(symbol):
|
||||
"""
|
||||
Create a sid by hashing the symbol of a currency pair.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbol: str
|
||||
|
||||
Returns
|
||||
-------
|
||||
int
|
||||
The resulting sid.
|
||||
|
||||
"""
|
||||
sid = int(
|
||||
hashlib.sha256(symbol.encode('utf-8')).hexdigest(), 16
|
||||
) % 10 ** 6
|
||||
return sid
|
||||
|
||||
|
||||
def get_exchange_folder(exchange_name, environ=None):
|
||||
@@ -106,8 +124,12 @@ def get_exchange_symbols(exchange_name, is_local=False, environ=None):
|
||||
|
||||
if os.path.isfile(filename):
|
||||
with open(filename) as data_file:
|
||||
data = json.load(data_file)
|
||||
return data
|
||||
try:
|
||||
data = json.load(data_file)
|
||||
return data
|
||||
|
||||
except ValueError:
|
||||
return dict()
|
||||
else:
|
||||
raise ExchangeSymbolsNotFound(
|
||||
exchange=exchange_name,
|
||||
@@ -115,6 +137,32 @@ def get_exchange_symbols(exchange_name, is_local=False, environ=None):
|
||||
)
|
||||
|
||||
|
||||
def save_exchange_symbols(exchange_name, assets, is_local=False, environ=None):
|
||||
"""
|
||||
Save assets into an exchange_symbols file.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
exchange_name: str
|
||||
assets: list[dict[str, object]]
|
||||
is_local: bool
|
||||
environ
|
||||
|
||||
Returns
|
||||
-------
|
||||
|
||||
"""
|
||||
asset_dicts = dict()
|
||||
for symbol in assets:
|
||||
asset_dicts[symbol] = assets[symbol].to_dict()
|
||||
|
||||
filename = get_exchange_symbols_filename(
|
||||
exchange_name, is_local, environ
|
||||
)
|
||||
with open(filename, 'wt') as handle:
|
||||
json.dump(asset_dicts, handle, indent=4, default=symbols_serial)
|
||||
|
||||
|
||||
def get_symbols_string(assets):
|
||||
"""
|
||||
A concatenated string of symbols from a list of assets.
|
||||
|
||||
@@ -35,8 +35,13 @@ class Poloniex(Exchange):
|
||||
def __init__(self, key, secret, base_currency, portfolio=None):
|
||||
self.api = Poloniex_api(key=key, secret=secret)
|
||||
self.name = 'poloniex'
|
||||
self.assets = {}
|
||||
|
||||
self.assets = dict()
|
||||
self.load_assets()
|
||||
|
||||
self.local_assets = dict()
|
||||
self.load_assets(is_local=True)
|
||||
|
||||
self.base_currency = base_currency
|
||||
self._portfolio = portfolio
|
||||
self.minute_writer = None
|
||||
|
||||
@@ -448,7 +448,7 @@ class TestExchangeBundle:
|
||||
end_dt = pd.to_datetime('2016-6-1', utc=True)
|
||||
self._bundle_to_csv(
|
||||
asset=asset,
|
||||
exchange=exchange,
|
||||
exchange_name=exchange.name,
|
||||
data_frequency=data_frequency,
|
||||
filename='{}_{}_{}'.format(
|
||||
exchange_name, data_frequency, asset.symbol
|
||||
@@ -474,16 +474,16 @@ class TestExchangeBundle:
|
||||
)
|
||||
self._bundle_to_csv(
|
||||
asset=asset,
|
||||
exchange=exchange,
|
||||
exchange_name=exchange.name,
|
||||
data_frequency=data_frequency,
|
||||
path=path,
|
||||
filename=period
|
||||
)
|
||||
pass
|
||||
|
||||
def _bundle_to_csv(self, asset, exchange, data_frequency, filename,
|
||||
def _bundle_to_csv(self, asset, exchange_name, data_frequency, filename,
|
||||
path=None, start_dt=None, end_dt=None):
|
||||
bundle = ExchangeBundle(exchange)
|
||||
bundle = ExchangeBundle(exchange_name)
|
||||
reader = bundle.get_reader(data_frequency, path=path)
|
||||
|
||||
if start_dt is None:
|
||||
@@ -514,24 +514,39 @@ class TestExchangeBundle:
|
||||
df = get_df_from_arrays(arrays, periods)
|
||||
|
||||
folder = os.path.join(
|
||||
tempfile.gettempdir(), 'catalyst', exchange.name, asset.symbol
|
||||
tempfile.gettempdir(), 'catalyst', exchange_name, asset.symbol
|
||||
)
|
||||
ensure_directory(folder)
|
||||
|
||||
path = os.path.join(folder, filename + '.csv')
|
||||
|
||||
log.info('creating csv file: {}'.format(path))
|
||||
print('HEAD\n{}'.format(df.head(10)))
|
||||
print('TAIL\n{}'.format(df.tail(10)))
|
||||
print('HEAD\n{}'.format(df.head(100)))
|
||||
print('TAIL\n{}'.format(df.tail(100)))
|
||||
df.to_csv(path)
|
||||
pass
|
||||
|
||||
def test_ingest_csv(self):
|
||||
data_frequency = 'minute'
|
||||
exchange_name = 'bittrex'
|
||||
path = '/Users/fredfortier/Dropbox/Enigma/Data/bat_eth.csv'
|
||||
path = '/Users/fredfortier/Dropbox/Enigma/Data/bittrex_bat_eth.csv'
|
||||
|
||||
exchange_bundle = ExchangeBundle(exchange_name)
|
||||
exchange_bundle.ingest_csv(path, data_frequency)
|
||||
|
||||
exchange = get_exchange(exchange_name)
|
||||
asset = exchange.get_asset('bat_eth')
|
||||
|
||||
start_dt = pd.to_datetime('2017-6-3', utc=True)
|
||||
end_dt = pd.to_datetime('2017-8-3 19:24', utc=True)
|
||||
self._bundle_to_csv(
|
||||
asset=asset,
|
||||
exchange_name=exchange.name,
|
||||
data_frequency=data_frequency,
|
||||
filename='{}_{}_{}'.format(
|
||||
exchange_name, data_frequency, asset.symbol
|
||||
),
|
||||
start_dt=start_dt,
|
||||
end_dt=end_dt
|
||||
)
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user