mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-27 19:30:28 +08:00
Untold improvements
This commit is contained in:
@@ -4,6 +4,7 @@ from datetime import datetime
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from pandas_datareader.data import DataReader
|
||||
from pandas.tseries.offsets import DateOffset
|
||||
import requests
|
||||
|
||||
from catalyst.utils.calendars import register_calendar_alias
|
||||
@@ -88,19 +89,30 @@ def poloniex_cryptoassets(symbols, start=None, end=None):
|
||||
|
||||
for symbol in symbols:
|
||||
#def to_dataframe(self, start, end, currencyPair=None):
|
||||
csv_fn = '/var/tmp/' + 'crypto_prices-' + symbol + '.csv' # TODO: DIR as parameter
|
||||
csv_fn = '/var/tmp/catalyst/data/poloniex/crypto_prices-' + symbol + '.csv' # TODO: DIR as parameter
|
||||
#last_date = self._get_start_date(csv_fn)
|
||||
#if last_date + 300 < end or not os.path.exists(csv_fn):
|
||||
# get latest data
|
||||
#self.append_data_single_pair(currencyPair)
|
||||
|
||||
# CSV holds the latest snapshot
|
||||
df = pd.read_csv(csv_fn, names=['date', 'open', 'high', 'low', 'close', 'volume'])
|
||||
df['date']=pd.to_datetime(df['date'], utc=True, unit='s')
|
||||
df.set_index('date', inplace=True)
|
||||
data = pd.read_csv(csv_fn, names=['date', 'open', 'high', 'low', 'close', 'volume'])
|
||||
data['date'] = pd.to_datetime(data['date'], utc=True, unit='s')
|
||||
data.set_index('date', inplace=True)
|
||||
|
||||
#df = df.resample('D').mean()
|
||||
df = df.loc[df.index.isin(calendar.schedule.index)]
|
||||
df = data.loc[data.index.isin(calendar.schedule.index)]
|
||||
|
||||
offset = DateOffset(days=1)
|
||||
for start_date in df.index:
|
||||
end_date = start_date + offset
|
||||
day_data = data[start_date:end_date]
|
||||
|
||||
df[start_date]['open'] = day_data[0]['open']
|
||||
df[start_date]['high'] = day_data['high'].max()
|
||||
df[start_date]['low'] = day_data['low'].min()
|
||||
df[start_date]['close'] = day_data[-1]['close']
|
||||
df[start_date]['volume'] = day_data['volume'].sum()
|
||||
|
||||
# the start date is the date of the first trade and
|
||||
# the end date is the date of the last trade
|
||||
@@ -158,7 +170,7 @@ def poloniex_cryptoassets(symbols, start=None, end=None):
|
||||
symbol_map = pd.Series(metadata.symbol.index, metadata.symbol)
|
||||
|
||||
# Hardcode the exchange to "POLO" for all assets and (elsewhere)
|
||||
# register "YAHOO" to resolve to the OPEN calendar, because these are
|
||||
# register "POLO" to resolve to the OPEN calendar, because these are
|
||||
# all cryptoassets and thus use the OPEN calendar.
|
||||
metadata['exchange'] = 'POLO'
|
||||
asset_db_writer.write(equities=metadata)
|
||||
|
||||
@@ -18,6 +18,7 @@ from collections import OrderedDict
|
||||
import logbook
|
||||
import pandas as pd
|
||||
from pandas_datareader.data import DataReader
|
||||
import datetime
|
||||
import pytz
|
||||
from six import iteritems
|
||||
from six.moves.urllib_error import HTTPError
|
||||
@@ -91,6 +92,56 @@ def has_data_for_dates(series_or_df, first_date, last_date):
|
||||
first, last = dts[[0, -1]]
|
||||
return (first <= first_date) and (last >= last_date)
|
||||
|
||||
def load_crypto_market_data(trading_day=None,
|
||||
trading_days=None,
|
||||
bm_symbol='USDT_BTC',
|
||||
environ=None):
|
||||
if trading_day is None:
|
||||
trading_day = get_calendar('OPEN').trading_day
|
||||
if trading_days is None:
|
||||
trading_days = get_calendar('OPEN').all_sessions
|
||||
|
||||
first_date = trading_days[0]
|
||||
now = pd.Timestamp.utcnow()
|
||||
|
||||
# We expect to have benchmark and treasury data that's current up until
|
||||
# **two** full trading days prior to the most recently completed trading
|
||||
# day.
|
||||
# Example:
|
||||
# On Thu Oct 22 2015, the previous completed trading day is Wed Oct 21.
|
||||
# However, data for Oct 21 doesn't become available until the early morning
|
||||
# hours of Oct 22. This means that there are times on the 22nd at which we
|
||||
# cannot reasonably expect to have data for the 21st available. To be
|
||||
# conservative, we instead expect that at any time on the 22nd, we can
|
||||
# download data for Tuesday the 20th, which is two full trading days prior
|
||||
# to the date on which we're running a test.
|
||||
|
||||
# We'll attempt to download new data if the latest entry in our cache is
|
||||
# before this date.
|
||||
last_date = trading_days[trading_days.get_loc(now, method='ffill') - 2]
|
||||
|
||||
br = ensure_crypto_benchmark_data(
|
||||
bm_symbol,
|
||||
first_date,
|
||||
last_date,
|
||||
now,
|
||||
# We need the trading_day to figure out the close prior to the first
|
||||
# date so that we can compute returns for the first date.
|
||||
trading_day,
|
||||
environ,
|
||||
)
|
||||
tc = ensure_treasury_data(
|
||||
bm_symbol,
|
||||
first_date,
|
||||
last_date,
|
||||
now,
|
||||
environ,
|
||||
)
|
||||
benchmark_returns = br[br.index.slice_indexer(first_date, last_date)]
|
||||
treasury_curves = tc[tc.index.slice_indexer(first_date, last_date)]
|
||||
return benchmark_returns, treasury_curves
|
||||
|
||||
|
||||
|
||||
def load_market_data(trading_day=None, trading_days=None, bm_symbol='SPY',
|
||||
environ=None):
|
||||
@@ -177,6 +228,135 @@ def load_market_data(trading_day=None, trading_days=None, bm_symbol='SPY',
|
||||
treasury_curves = tc[tc.index.slice_indexer(first_date, last_date)]
|
||||
return benchmark_returns, treasury_curves
|
||||
|
||||
def ensure_crypto_benchmark_data(symbol, first_date, last_date, now,
|
||||
trading_day, environ=None):
|
||||
filename = get_benchmark_filename(symbol)
|
||||
source_filename = '/var/tmp/catalyst/data/poloniex/crypto_prices-{0}.csv'.\
|
||||
format(symbol)
|
||||
|
||||
logger.info(
|
||||
('Loading benchmark data for {symbol!r} '
|
||||
'from {first_date} to {last_date}'),
|
||||
symbol=symbol,
|
||||
first_date=first_date - trading_day,
|
||||
last_date=last_date
|
||||
)
|
||||
|
||||
data = _load_cached_data(
|
||||
filename,
|
||||
first_date,
|
||||
last_date,
|
||||
now,
|
||||
'benchmark',
|
||||
environ,
|
||||
)
|
||||
|
||||
|
||||
if data is not None:
|
||||
print 'benchmark data:\n', data.head()
|
||||
return data
|
||||
|
||||
# If no cached data was found or it was missing any dates then download the
|
||||
# necessary data.
|
||||
logger.info(
|
||||
('Downloading benchmark data for {symbol!r} '
|
||||
'from {first_date} to {last_date}'),
|
||||
symbol=symbol,
|
||||
first_date=first_date - trading_day,
|
||||
last_date=last_date
|
||||
)
|
||||
|
||||
def dateparse(time_in_secs):
|
||||
return datetime.datetime.fromtimestamp(float(time_in_secs), pytz.utc)
|
||||
|
||||
try:
|
||||
data = pd.read_csv(
|
||||
source_filename,
|
||||
names=['date', 'open', 'high', 'low', 'close', 'volume'],
|
||||
index_col=[0],
|
||||
parse_dates=True,
|
||||
date_parser=dateparse,
|
||||
)
|
||||
data = data[['close']]
|
||||
|
||||
print 'loaded benchmark data:\n', data.index
|
||||
|
||||
data = data[
|
||||
(data.index >= (first_date-trading_day)) &
|
||||
(data.index <= last_date)
|
||||
]
|
||||
data = data.pct_change(1).iloc[1:]
|
||||
|
||||
print 'writing benchmark data:\n', data.head()
|
||||
|
||||
data.to_csv(get_data_filepath(filename, environ))
|
||||
except (OSError, IOError, HTTPError):
|
||||
logger.exception('Failed to cache the new benchmark returns')
|
||||
raise
|
||||
if not has_data_for_dates(data, first_date, last_date):
|
||||
logger.warn("Still don't have expected data after redownload!")
|
||||
return data
|
||||
|
||||
|
||||
def ensure_benchmark_data(symbol, first_date, last_date, now, trading_day,
|
||||
environ=None):
|
||||
"""
|
||||
Ensure we have benchmark data for `symbol` from `first_date` to `last_date`
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbol : str
|
||||
The symbol for the benchmark to load.
|
||||
first_date : pd.Timestamp
|
||||
First required date for the cache.
|
||||
last_date : pd.Timestamp
|
||||
Last required date for the cache.
|
||||
now : pd.Timestamp
|
||||
The current time. This is used to prevent repeated attempts to
|
||||
re-download data that isn't available due to scheduling quirks or other
|
||||
failures.
|
||||
trading_day : pd.CustomBusinessDay
|
||||
A trading day delta. Used to find the day before first_date so we can
|
||||
get the close of the day prior to first_date.
|
||||
|
||||
We attempt to download data unless we already have data stored at the data
|
||||
cache for `symbol` whose first entry is before or on `first_date` and whose
|
||||
last entry is on or after `last_date`.
|
||||
|
||||
If we perform a download and the cache criteria are not satisfied, we wait
|
||||
at least one hour before attempting a redownload. This is determined by
|
||||
comparing the current time to the result of os.path.getmtime on the cache
|
||||
path.
|
||||
"""
|
||||
filename = get_benchmark_filename(symbol)
|
||||
data = _load_cached_data(filename, first_date, last_date, now, 'benchmark',
|
||||
environ)
|
||||
if data is not None:
|
||||
return data
|
||||
|
||||
# If no cached data was found or it was missing any dates then download the
|
||||
# necessary data.
|
||||
logger.info(
|
||||
('Downloading benchmark data for {symbol!r} '
|
||||
'from {first_date} to {last_date}'),
|
||||
symbol=symbol,
|
||||
first_date=first_date - trading_day,
|
||||
last_date=last_date
|
||||
)
|
||||
|
||||
try:
|
||||
data = get_benchmark_returns(
|
||||
symbol,
|
||||
first_date - trading_day,
|
||||
last_date,
|
||||
)
|
||||
data.to_csv(get_data_filepath(filename, environ))
|
||||
except (OSError, IOError, HTTPError):
|
||||
logger.exception('Failed to cache the new benchmark returns')
|
||||
raise
|
||||
if not has_data_for_dates(data, first_date, last_date):
|
||||
logger.warn("Still don't have expected data after redownload!")
|
||||
return data
|
||||
|
||||
def ensure_benchmark_data(symbol, first_date, last_date, now, trading_day,
|
||||
environ=None):
|
||||
|
||||
@@ -13,21 +13,102 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from catalyst.api import order, symbol
|
||||
|
||||
import numpy as np
|
||||
|
||||
from catalyst.api import (
|
||||
order,
|
||||
symbol,
|
||||
record,
|
||||
)
|
||||
|
||||
stocks = ['USDT_BTC']
|
||||
|
||||
TARGET_INVESTMENT_RATIO = 0.1
|
||||
|
||||
def initialize(context):
|
||||
context.has_ordered = False
|
||||
context.stocks = stocks
|
||||
context.asset = symbol('USDT_BTC')
|
||||
|
||||
|
||||
def handle_data(context, data):
|
||||
if not context.has_ordered:
|
||||
for stock in context.stocks:
|
||||
order(symbol(stock), 100)
|
||||
context.has_ordered = True
|
||||
price = data[context.asset].price
|
||||
amt = TARGET_INVESTMENT_RATIO * (context.portfolio.cash / price)
|
||||
if not np.isnan(amt):
|
||||
print 'amt:', amt
|
||||
order(context.asset, amt, limit_price=price*1.5)
|
||||
context.has_ordered = True
|
||||
|
||||
record(
|
||||
USDT_BTC=data[context.asset].price,
|
||||
cash=context.portfolio.cash,
|
||||
leverage=context.account.leverage,
|
||||
)
|
||||
|
||||
def analyze(context=None, results=None):
|
||||
import matplotlib.pyplot as plt
|
||||
# Plot the portfolio and asset data.
|
||||
ax1 = plt.subplot(511)
|
||||
results[['portfolio_value']].plot(ax=ax1)
|
||||
ax1.set_ylabel('Portfolio value (USD)')
|
||||
|
||||
ax2 = plt.subplot(512, sharex=ax1)
|
||||
ax2.set_ylabel('USDT_BTC (USD)')
|
||||
results[['USDT_BTC']].plot(ax=ax2)
|
||||
|
||||
trans = results.ix[[t != [] for t in results.transactions]]
|
||||
buys = trans.ix[
|
||||
[t[0]['amount'] > 0 for t in trans.transactions]
|
||||
]
|
||||
sells = trans.ix[
|
||||
[t[0]['amount'] < 0 for t in trans.transactions]
|
||||
]
|
||||
print 'buys:', buys.head()
|
||||
ax2.plot(
|
||||
buys.index, results.USDT_BTC[buys.index],
|
||||
'^',
|
||||
markersize=10,
|
||||
color='m',
|
||||
)
|
||||
ax2.plot(
|
||||
sells.index, results.USDT_BTC[sells.index],
|
||||
'v',
|
||||
markersize=10,
|
||||
color='k',
|
||||
)
|
||||
|
||||
ax3 = plt.subplot(513, sharex=ax1)
|
||||
results[['leverage', 'alpha', 'beta']].plot(ax=ax3)
|
||||
ax3.set_ylabel('Leverage (USD)')
|
||||
|
||||
ax4 = plt.subplot(514, sharex=ax1)
|
||||
results[['cash']].plot(ax=ax4)
|
||||
ax4.set_ylabel('Cash (USD)')
|
||||
|
||||
results[[
|
||||
'treasury',
|
||||
'algorithm',
|
||||
'benchmark',
|
||||
]] = results[[
|
||||
'treasury_period_return',
|
||||
'algorithm_period_return',
|
||||
'benchmark_period_return',
|
||||
]]
|
||||
|
||||
ax5 = plt.subplot(515, sharex=ax1)
|
||||
results[[
|
||||
'treasury',
|
||||
'algorithm',
|
||||
'benchmark',
|
||||
]].plot(ax=ax5)
|
||||
ax5.set_ylabel('Dollars (USD)')
|
||||
|
||||
plt.legend(loc=3)
|
||||
|
||||
# Show the plot.
|
||||
plt.gcf().set_size_inches(18, 8)
|
||||
plt.show()
|
||||
|
||||
|
||||
def _test_args():
|
||||
|
||||
@@ -46,7 +46,7 @@ SHORT_WINDOW = 30
|
||||
LONG_WINDOW = 100
|
||||
|
||||
def initialize(context):
|
||||
context.asset = symbol('USDT_BTC')
|
||||
context.asset = symbol('USDT_LTC')
|
||||
context.i = 0
|
||||
|
||||
set_commission(PerDollar(cost=0.001))
|
||||
@@ -105,7 +105,7 @@ def rebalance(context, data):
|
||||
order_target_percent(context.asset, 0.0)
|
||||
|
||||
record(
|
||||
USDT_BTC=price,
|
||||
USDT_LTC=price,
|
||||
cash=context.portfolio.cash,
|
||||
leverage=context.account.leverage,
|
||||
short_mavg=short_mavg,
|
||||
@@ -124,8 +124,8 @@ def analyze(context=None, results=None):
|
||||
ax1.set_ylabel('Portfolio value (USD)')
|
||||
|
||||
ax2 = plt.subplot(512, sharex=ax1)
|
||||
ax2.set_ylabel('USDT_BTC (USD)')
|
||||
results[['USDT_BTC', 'short_mavg', 'long_mavg']].plot(ax=ax2)
|
||||
ax2.set_ylabel('USDT_LTC (USD)')
|
||||
results[['USDT_LTC', 'short_mavg', 'long_mavg']].plot(ax=ax2)
|
||||
|
||||
trans = results.ix[[t != [] for t in results.transactions]]
|
||||
buys = trans.ix[
|
||||
@@ -136,13 +136,13 @@ def analyze(context=None, results=None):
|
||||
]
|
||||
print 'buys:', buys.head()
|
||||
ax2.plot(
|
||||
buys.index, results.USDT_BTC[buys.index],
|
||||
buys.index, results.USDT_LTC[buys.index],
|
||||
'^',
|
||||
markersize=10,
|
||||
color='m',
|
||||
)
|
||||
ax2.plot(
|
||||
sells.index, results.USDT_BTC[sells.index],
|
||||
sells.index, results.USDT_LTC[sells.index],
|
||||
'v',
|
||||
markersize=10,
|
||||
color='k',
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
from datetime import time
|
||||
from pytz import timezone
|
||||
|
||||
from .trading_calendar import TradingCalendar
|
||||
from pandas.tseries.offsets import DateOffset
|
||||
|
||||
from catalyst.utils.memoize import lazyval
|
||||
|
||||
from .trading_calendar import TradingCalendar
|
||||
|
||||
|
||||
class OpenExchangeCalendar(TradingCalendar):
|
||||
@property
|
||||
@@ -25,4 +27,4 @@ class OpenExchangeCalendar(TradingCalendar):
|
||||
|
||||
@lazyval
|
||||
def day(self):
|
||||
return 'D'
|
||||
return DateOffset(days=1)
|
||||
|
||||
@@ -13,10 +13,12 @@ try:
|
||||
except:
|
||||
PYGMENTS = False
|
||||
from toolz import valfilter, concatv
|
||||
from functools import partial
|
||||
|
||||
from catalyst.algorithm import TradingAlgorithm
|
||||
from catalyst.data.bundles.core import load
|
||||
from catalyst.data.data_portal import DataPortal
|
||||
from catalyst.data.loader import load_crypto_market_data
|
||||
from catalyst.finance.trading import TradingEnvironment
|
||||
from catalyst.pipeline.data import USEquityPricing, CryptoPricing
|
||||
from catalyst.pipeline.loaders import (
|
||||
@@ -143,9 +145,17 @@ def _run(handle_data,
|
||||
"invalid url %r, must begin with 'sqlite:///'" %
|
||||
str(bundle_data.asset_finder.engine.url),
|
||||
)
|
||||
env = TradingEnvironment(asset_db_path=connstr, environ=environ)
|
||||
|
||||
env = TradingEnvironment(
|
||||
#load=partial(load_crypto_market_data, environ=environ),
|
||||
#bm_symbol='USDT_BTC',
|
||||
asset_db_path=connstr,
|
||||
environ=environ,
|
||||
)
|
||||
|
||||
first_trading_day =\
|
||||
bundle_data.equity_minute_bar_reader.first_trading_day
|
||||
|
||||
data = DataPortal(
|
||||
env.asset_finder,
|
||||
get_calendar('NYSE'),
|
||||
|
||||
@@ -10,7 +10,7 @@ import catalyst.data.bundles.core as bundles
|
||||
|
||||
DT_START = time.mktime(datetime(2010, 01, 01, 0, 0).timetuple())
|
||||
# DT_START = time.mktime(datetime(2017, 06, 13, 0, 0).timetuple()) # TODO: remove temp
|
||||
CSV_OUT_FOLDER = '/var/tmp/catalyst/data/'
|
||||
CSV_OUT_FOLDER = '/var/tmp/catalyst/data/poloniex/'
|
||||
CONN_RETRIES = 2
|
||||
|
||||
logbook.StderrHandler().push_application()
|
||||
@@ -119,26 +119,24 @@ class PoloniexDataGenerator(object):
|
||||
Makes sure data is up to date
|
||||
'''
|
||||
def to_dataframe(self, start, end, currencyPair=None):
|
||||
csv_fn = CSV_OUT_FOLDER + 'crypto_prices-' + currencyPair + '.csv'
|
||||
last_date = self._get_start_date(csv_fn)
|
||||
if last_date + 300 < end or not os.path.exists(csv_fn):
|
||||
# get latest data
|
||||
self.append_data_single_pair(currencyPair)
|
||||
csv_fn = CSV_OUT_FOLDER + 'crypto_prices-' + currencyPair + '.csv'
|
||||
last_date = self._get_start_date(csv_fn)
|
||||
if last_date + 300 < end or not os.path.exists(csv_fn):
|
||||
# get latest data
|
||||
self.append_data_single_pair(currencyPair)
|
||||
|
||||
# CSV holds the latest snapshot
|
||||
df = pd.read_csv(csv_fn, names=['date', 'open', 'high', 'low', 'close', 'volume'])
|
||||
df['date']=pd.to_datetime(df['date'],unit='s')
|
||||
# CSV holds the latest snapshot
|
||||
df = pd.read_csv(csv_fn, names=['date', 'open', 'high', 'low', 'close', 'volume'])
|
||||
df['date']=pd.to_datetime(df['date'],unit='s')
|
||||
df.set_index('date', inplace=True)
|
||||
|
||||
#return df.loc[(df.index > start) & (df.index <= end)]
|
||||
#return df.loc[(df.index > start) & (df.index <= end)]
|
||||
return df[datetime.fromtimestamp(start):datetime.fromtimestamp(end-1)]
|
||||
|
||||
if __name__ == '__main__':
|
||||
pdg = PoloniexDataGenerator()
|
||||
pdg.get_currency_pairs()
|
||||
pdg.append_data()
|
||||
df = pdg.to_dataframe(time.mktime(datetime(2017, 6, 01, 0, 0).timetuple()),time.mktime(datetime(2017, 6, 02, 0, 0).timetuple()),'USDT_BTC')
|
||||
print(df)
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user